You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/07 18:04:08 UTC
[01/24] hadoop git commit: YARN-3793. Several NPEs when deleting
local files on NM recovery. Contributed by Varun Saxena
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 93cebb2e2 -> 83f39a32c
YARN-3793. Several NPEs when deleting local files on NM recovery. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5cdf78e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5cdf78e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5cdf78e
Branch: refs/heads/HDFS-7240
Commit: b5cdf78e8e6cd6c5c1fb7286207dac72be32c0d6
Parents: eac1d18
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jul 1 21:13:32 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jul 1 21:13:32 2015 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../logaggregation/AppLogAggregatorImpl.java | 6 ++--
.../TestLogAggregationService.java | 36 ++++++++++++++++++++
3 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5cdf78e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4389e27..3620a71 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -598,6 +598,9 @@ Release 2.7.2 - UNRELEASED
BUG FIXES
+ YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
+ Saxena via jlowe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5cdf78e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 4b95a03..654eb0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -276,10 +276,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
aggregator.doContainerLogAggregation(writer, appFinished);
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
+ this.delService.delete(this.userUgi.getShortUserName(), null,
+ uploadedFilePathsInThisCycle
+ .toArray(new Path[uploadedFilePathsInThisCycle.size()]));
}
- this.delService.delete(this.userUgi.getShortUserName(), null,
- uploadedFilePathsInThisCycle
- .toArray(new Path[uploadedFilePathsInThisCycle.size()]));
// This container is finished, and all its logs have been uploaded,
// remove it from containerLogAggregators.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5cdf78e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index fd97cef..6a3d270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -280,6 +281,41 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
verifyLocalFileDeletion(logAggregationService);
}
+ /* Test to verify fix for YARN-3793 */
+ @Test
+ public void testNoLogsUploadedOnAppFinish() throws Exception {
+ this.delSrvc = new DeletionService(createContainerExecutor());
+ delSrvc = spy(delSrvc);
+ this.delSrvc.init(conf);
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+
+ LogAggregationService logAggregationService = new LogAggregationService(
+ dispatcher, this.context, this.delSrvc, super.dirsHandler);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId app = BuilderUtils.newApplicationId(1234, 1);
+ File appLogDir = new File(localLogDir, ConverterUtils.toString(app));
+ appLogDir.mkdir();
+ LogAggregationContext context =
+ LogAggregationContext.newInstance("HOST*", "sys*");
+ logAggregationService.handle(new LogHandlerAppStartedEvent(app, this.user,
+ null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, context));
+
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(app, 1);
+ ContainerId cont = BuilderUtils.newContainerId(appAttemptId, 1);
+ writeContainerLogs(appLogDir, cont, new String[] { "stdout",
+ "stderr", "syslog" });
+ logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0));
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(app));
+ logAggregationService.stop();
+ delSrvc.stop();
+ // Aggregated logs should not be deleted if not uploaded.
+ verify(delSrvc, times(0)).delete(user, null);
+ }
@Test
public void testNoContainerOnNode() throws Exception {
[06/24] hadoop git commit: YARN-3875.
FSSchedulerNode#reserveResource() doesn't print Application Id properly in
log. Contributed by Bibin A Chundatt.
Posted by ar...@apache.org.
YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
properly in log. Contributed by Bibin A Chundatt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37d73957
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37d73957
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37d73957
Branch: refs/heads/HDFS-7240
Commit: 37d7395773b5bb24aa522db38a2602df9a5ac184
Parents: f379622
Author: Devaraj K <de...@apache.org>
Authored: Thu Jul 2 10:20:31 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Thu Jul 2 10:20:31 2015 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +++
.../resourcemanager/scheduler/fair/FSSchedulerNode.java | 11 ++++++-----
2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37d73957/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6839548..2009b47 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -586,6 +586,9 @@ Release 2.8.0 - UNRELEASED
YARN-3830. AbstractYarnScheduler.createReleaseCache may try to clean a null
attempt. (nijel via devaraj)
+ YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
+ properly in log. (Bibin A Chundatt via devaraj)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37d73957/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index be08dff..c86201a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -68,12 +68,13 @@ public class FSSchedulerNode extends SchedulerNode {
" on node " + this);
}
- LOG.info("Updated reserved container " +
- container.getContainer().getId() + " on node " +
- this + " for application " + application);
+ LOG.info("Updated reserved container " + container.getContainer().getId()
+ + " on node " + this + " for application "
+ + application.getApplicationId());
} else {
- LOG.info("Reserved container " + container.getContainer().getId() +
- " on node " + this + " for application " + application);
+ LOG.info("Reserved container " + container.getContainer().getId()
+ + " on node " + this + " for application "
+ + application.getApplicationId());
}
setReservedContainer(container);
this.reservedAppSchedulable = (FSAppAttempt) application;
[20/24] hadoop git commit: YARN-3837. javadocs of
TimelineAuthenticationFilterInitializer give wrong prefix for auth options.
Contributed by Bibin A Chundatt.
Posted by ar...@apache.org.
YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong
prefix for auth options. Contributed by Bibin A Chundatt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af63427c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af63427c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af63427c
Branch: refs/heads/HDFS-7240
Commit: af63427c6d7d2fc251eafb1f152b7a90c5bd07e5
Parents: 81f3644
Author: Devaraj K <de...@apache.org>
Authored: Tue Jul 7 12:06:30 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Tue Jul 7 12:06:30 2015 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +++
.../security/TimelineAuthenticationFilterInitializer.java | 5 ++---
2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af63427c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f35be75..8f63187 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -592,6 +592,9 @@ Release 2.8.0 - UNRELEASED
YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
after create them. (zhihai xu via xgong)
+ YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong
+ prefix for auth options. (Bibin A Chundatt via devaraj)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af63427c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
index a3c136c..4e7c29a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
@@ -62,9 +62,8 @@ public class TimelineAuthenticationFilterInitializer extends FilterInitializer {
* Initializes {@link TimelineAuthenticationFilter}
* <p>
* Propagates to {@link TimelineAuthenticationFilter} configuration all YARN
- * configuration properties prefixed with
- * {@code yarn.timeline-service.authentication.}
- *
+ * configuration properties prefixed with {@value #PREFIX}
+ *
* @param container
* The filter container
* @param conf
[21/24] hadoop git commit: HADOOP-12117. Potential NPE from
Configuration#loadProperty with allowNullValueProperties set. (Contributed by
zhihai xu)
Posted by ar...@apache.org.
HADOOP-12117. Potential NPE from Configuration#loadProperty with allowNullValueProperties set. (Contributed by zhihai xu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/99c8c583
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/99c8c583
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/99c8c583
Branch: refs/heads/HDFS-7240
Commit: 99c8c5839b65666e6099116e4d7024e0eb4682b9
Parents: af63427
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Jul 7 16:11:27 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Tue Jul 7 16:11:27 2015 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++
.../java/org/apache/hadoop/conf/Configuration.java | 8 ++++----
.../org/apache/hadoop/conf/TestConfiguration.java | 15 +++++++++++++++
3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c8c583/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index faf5a5c..5d11db9 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -933,6 +933,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
multiple times (zhihai xu via vinayakumarb)
+ HADOOP-12117. Potential NPE from Configuration#loadProperty with
+ allowNullValueProperties set. (zhihai xu via vinayakumarb)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c8c583/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
index 54e07c6..0b45429 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
@@ -2735,14 +2735,14 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
to.put(entry.getKey(), entry.getValue());
}
}
-
+
private void loadProperty(Properties properties, String name, String attr,
String value, boolean finalParameter, String[] source) {
if (value != null || allowNullValueProperties) {
+ if (value == null) {
+ value = DEFAULT_STRING_CHECK;
+ }
if (!finalParameters.contains(attr)) {
- if (value==null && allowNullValueProperties) {
- value = DEFAULT_STRING_CHECK;
- }
properties.setProperty(attr, value);
if(source != null) {
updatingResource.put(attr, source);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c8c583/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
index ec6c964..a039741 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
@@ -42,6 +42,7 @@ import static java.util.concurrent.TimeUnit.*;
import junit.framework.TestCase;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
+
import org.codehaus.jackson.map.ObjectMapper;
import org.mockito.Mockito;
@@ -1511,6 +1513,19 @@ public class TestConfiguration extends TestCase {
// it's expected behaviour.
}
+ public void testNullValueProperties() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setAllowNullValueProperties(true);
+ out = new BufferedWriter(new FileWriter(CONFIG));
+ startConfig();
+ appendProperty("attr", "value", true);
+ appendProperty("attr", "", true);
+ endConfig();
+ Path fileResource = new Path(CONFIG);
+ conf.addResource(fileResource);
+ assertEquals("value", conf.get("attr"));
+ }
+
public static void main(String[] argv) throws Exception {
junit.textui.TestRunner.main(new String[]{
TestConfiguration.class.getName()
[02/24] hadoop git commit: HDFS-8666. Speedup the TestMover test.
Contributed by Walter Su.
Posted by ar...@apache.org.
HDFS-8666. Speedup the TestMover test. Contributed by Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/152e5df3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/152e5df3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/152e5df3
Branch: refs/heads/HDFS-7240
Commit: 152e5df3b65394c2939d6de3c4a649a207bb58d3
Parents: b5cdf78
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Jul 1 14:59:50 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Jul 1 14:59:50 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++
.../hadoop/hdfs/server/mover/TestMover.java | 19 +++++++++++++++++++
2 files changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/152e5df3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3736932..ec37542 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -694,6 +694,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8635. Migrate HDFS native build to new CMake framework (Alan Burlison
via Colin P. McCabe)
+ HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/152e5df3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 49e2b23..899b5c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
@@ -46,6 +47,21 @@ import org.junit.Assert;
import org.junit.Test;
public class TestMover {
+
+ static final int DEFAULT_BLOCK_SIZE = 100;
+
+ static {
+ TestBalancer.initTestSetup();
+ }
+
+ static void initConf(Configuration conf) {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+ }
+
static Mover newMover(Configuration conf) throws IOException {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
@@ -97,6 +113,7 @@ public class TestMover {
@Test
public void testScheduleBlockWithinSameNode() throws Exception {
final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
@@ -285,6 +302,7 @@ public class TestMover {
public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
@@ -361,6 +379,7 @@ public class TestMover {
public void testMoverFailedRetry() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
[19/24] hadoop git commit: HADOOP-11974. Fix FIONREAD #include on
Solaris (Alan Burlison via Colin P. McCabe)
Posted by ar...@apache.org.
HADOOP-11974. Fix FIONREAD #include on Solaris (Alan Burlison via Colin P. McCabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81f36443
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81f36443
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81f36443
Branch: refs/heads/HDFS-7240
Commit: 81f364437608b21e85fc393f63546bf8b425ac71
Parents: bf89ddb
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Jul 6 12:56:34 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Mon Jul 6 17:17:59 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++
.../native/src/org/apache/hadoop/net/unix/DomainSocket.c | 9 ++++++++-
2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f36443/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index fab78d4..faf5a5c 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -678,6 +678,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12045. Enable LocalFileSystem#setTimes to change atime.
(Kazuho Fujii via cnauroth)
+ HADOOP-11974. Fix FIONREAD #include on Solaris (Alan Burlison via Colin P.
+ McCabe)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f36443/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
index a3f27ee..e658d8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
@@ -31,7 +31,14 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/ioctl.h> /* for FIONREAD */
+
+/* For FIONREAD */
+#if defined(__sun)
+#include <sys/filio.h>
+#else
+#include <sys/ioctl.h>
+#endif
+
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
[07/24] hadoop git commit: HDFS-8703. Merge refactor of
DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)
Posted by ar...@apache.org.
HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff5999d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff5999d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff5999d
Branch: refs/heads/HDFS-7240
Commit: bff5999d07e9416a22846c849487e509ede55040
Parents: 37d7395
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jul 2 16:11:50 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Thu Jul 2 16:11:50 2015 +0530
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 2 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 233 +++++++++++++------
.../hadoop/hdfs/TestDFSClientRetries.java | 2 +-
4 files changed, 169 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ec37542..7b96c56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -696,6 +696,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
+ HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
+ (vinayakumarb)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index f4ceab3..4923a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1181,7 +1181,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
// Get block info from namenode
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
try {
- return new DFSInputStream(this, src, verifyChecksum);
+ return new DFSInputStream(this, src, verifyChecksum, null);
} finally {
scope.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6563d7b..7f3722f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
- private final DFSClient dfsClient;
- private AtomicBoolean closed = new AtomicBoolean(false);
- private final String src;
- private final boolean verifyChecksum;
+ protected final DFSClient dfsClient;
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+ protected final String src;
+ protected final boolean verifyChecksum;
// state by stateful read only:
// (protected by lock on this)
/////
private DatanodeInfo currentNode = null;
- private LocatedBlock currentLocatedBlock = null;
- private long pos = 0;
- private long blockEnd = -1;
+ protected LocatedBlock currentLocatedBlock = null;
+ protected long pos = 0;
+ protected long blockEnd = -1;
private BlockReader blockReader = null;
////
// state shared by stateful and positional read:
// (protected by lock on infoLock)
////
- private LocatedBlocks locatedBlocks = null;
+ protected LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private FileEncryptionInfo fileEncryptionInfo = null;
- private CachingStrategy cachingStrategy;
+ protected CachingStrategy cachingStrategy;
////
- private final ReadStatistics readStatistics = new ReadStatistics();
+ protected final ReadStatistics readStatistics = new ReadStatistics();
// lock for state shared between read and pread
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
// (it's OK to acquire this lock when the lock on <this> is held)
- private final Object infoLock = new Object();
+ protected final Object infoLock = new Object();
/**
* Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* back to the namenode to get a new list of block locations, and is
* capped at maxBlockAcquireFailures
*/
- private int failures = 0;
+ protected int failures = 0;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
deadNodes.put(dnInfo, dnInfo);
}
- DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
- ) throws IOException, UnresolvedLinkException {
+ DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+ LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
- openInfo();
+ this.locatedBlocks = locatedBlocks;
+ openInfo(false);
}
/**
* Grab the open-file info from namenode
+ * @param refreshLocatedBlocks whether to re-fetch locatedblocks
*/
- void openInfo() throws IOException, UnresolvedLinkException {
+ void openInfo(boolean refreshLocatedBlocks) throws IOException,
+ UnresolvedLinkException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
- lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+ lastBlockBeingWrittenLength =
+ fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
@@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
- lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+ lastBlockBeingWrittenLength =
+ fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
@@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
- final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
+ private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+ throws IOException {
+ LocatedBlocks newInfo = locatedBlocks;
+ if (locatedBlocks == null || refresh) {
+ newInfo = dfsClient.getLocatedBlocks(src, 0);
+ }
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
@@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return located block
* @throws IOException
*/
- private LocatedBlock getBlockAt(long offset) throws IOException {
+ protected LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
@@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Fetch a block from namenode and cache it */
- private void fetchBlockAt(long offset) throws IOException {
+ protected void fetchBlockAt(long offset) throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
// Will be getting a new BlockReader.
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
//
// Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
@@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
fetchBlockAt(target);
} else {
connectFailedOnce = true;
- DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
- +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
+ DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ + ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
@@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
super.close();
}
@@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Wraps different possible read implementations so that readBuffer can be
* strategy-agnostic.
*/
- private interface ReaderStrategy {
+ interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
+
+ /**
+ * Copy data from the src ByteBuffer into the read buffer.
+ * @param src The src buffer where the data is copied from
+ * @param offset Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the offset of the byte array for copy.
+ * @param length Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the length of the data to copy.
+ */
+ public int copyFrom(ByteBuffer src, int offset, int length);
}
- private void updateReadStatistics(ReadStatistics readStatistics,
+ protected void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
if (nRead <= 0) return;
synchronized(infoLock) {
@@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ writeSlice.get(buf, offset, length);
+ return length;
+ }
}
/**
* Used to read bytes into a user-supplied ByteBuffer
*/
- private class ByteBufferStrategy implements ReaderStrategy {
+ protected class ByteBufferStrategy implements ReaderStrategy {
final ByteBuffer buf;
ByteBufferStrategy(ByteBuffer buf) {
this.buf = buf;
@@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
+ if (ret == 0) {
+ DFSClient.LOG.warn("zero");
+ }
return ret;
} finally {
if (!success) {
@@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
}
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+ writeSlice.limit(writeSlice.position() + remaining);
+ buf.put(writeSlice);
+ return remaining;
+ }
}
/* This is a used by regular read() and handles ChecksumExceptions.
@@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
@@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Add corrupted block replica into map.
*/
- private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
+ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet = null;
if((corruptedBlockMap.containsKey(blk))) {
@@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
- openInfo();
- block = getBlockAt(block.getStartOffset());
+ openInfo(true);
+ block = refreshLocatedBlock(block);
failures++;
}
}
@@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. Null if no node can be chosen.
*/
- private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
@@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return errMsgr.toString();
}
- private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
- LocatedBlock block = getBlockAt(blockStartOffset);
+ block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
- actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
+ actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlockMap);
return;
} catch (IOException e) {
@@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
- final long blockStartOffset, final long start, final long end,
+ final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
@@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
- actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlockMap);
return bb;
} finally {
@@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
};
}
+ /**
+ * Used when reading contiguous blocks
+ */
private void actualGetFromOneDataNode(final DNAddrPair datanode,
- long blockStartOffset, final long start, final long end, byte[] buf,
+ LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
+ final int length = (int) (end - start + 1);
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
+ new int[]{offset}, new int[]{length}, corruptedBlockMap);
+ }
+
+ /**
+ * Read data from one DataNode.
+ * @param datanode the datanode from which to read data
+ * @param block the located block containing the requested data
+ * @param startInBlk the startInBlk offset of the block
+ * @param endInBlk the endInBlk offset of the block
+ * @param buf the given byte array into which the data is read
+ * @param offsets the data may be read into multiple segments of the buf
+ * (when reading a striped block). this array indicates the
+ * offset of each buf segment.
+ * @param lengths the length of each buf segment
+ * @param corruptedBlockMap map recording list of datanodes with corrupted
+ * block replica
+ */
+ void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long startInBlk, final long endInBlk,
+ byte[] buf, int[] offsets, int[] lengths,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
+ final int len = (int) (endInBlk - startInBlk + 1);
+ checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
- LocatedBlock block = getBlockAt(blockStartOffset);
+ block = refreshLocatedBlock(block);
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
- int len = (int) (end - start + 1);
- reader = getBlockReader(block, start, len, datanode.addr,
+ reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
- int nread = reader.readAll(buf, offset, len);
- updateReadStatistics(readStatistics, nread, reader);
-
- if (nread != len) {
- throw new IOException("truncated return from reader.read(): " +
- "excpected " + len + ", got " + nread);
+ for (int i = 0; i < offsets.length; i++) {
+ int nread = reader.readAll(buf, offsets[i], lengths[i]);
+ updateReadStatistics(readStatistics, nread, reader);
+ if (nread != lengths[i]) {
+ throw new IOException("truncated return from reader.read(): " +
+ "excpected " + lengths[i] + ", got " + nread);
+ }
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+ * Refresh cached block locations.
+ * @param block The currently cached block locations
+ * @return Refreshed block locations
+ * @throws IOException
+ */
+ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+ throws IOException {
+ return getBlockAt(block.getStartOffset());
+ }
+
+ /**
+ * This method verifies that the read portions are valid and do not overlap
+ * with each other.
+ */
+ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+ Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
+ int sum = 0;
+ for (int i = 0; i < lengths.length; i++) {
+ if (i > 0) {
+ int gap = offsets[i] - offsets[i - 1];
+ // make sure read portions do not overlap with each other
+ Preconditions.checkArgument(gap >= lengths[i - 1]);
+ }
+ sum += lengths[i];
+ }
+ Preconditions.checkArgument(sum == totalLen);
+ }
+
+ /**
+ * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
- private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
+ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ByteBuffer bb = null;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
- LocatedBlock block = getBlockAt(blockStartOffset);
+ block = refreshLocatedBlock(block);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
@@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block.getStartOffset(), start, end, bb,
+ chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
@@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block.getStartOffset(), start, end, bb,
+ chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
@@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return true if block access token has expired or invalid and it should be
* refetched
*/
- private static boolean tokenRefetchNeeded(IOException ex,
+ protected static boolean tokenRefetchNeeded(IOException ex,
InetSocketAddress targetAddr) {
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
if (dfsClient.isHedgedReadsEnabled()) {
- hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
- targetStart + bytesToRead - 1, buffer, offset,
- corruptedBlockMap);
+ hedgedFetchBlockByteRange(blk, targetStart,
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else {
- fetchBlockByteRange(blk.getStartOffset(), targetStart,
- targetStart + bytesToRead - 1, buffer, offset,
- corruptedBlockMap);
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+ buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
@@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
- private void reportCheckSumFailure(
+ protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
@@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
*/
@Override
- public synchronized long getPos() throws IOException {
+ public synchronized long getPos() {
return pos;
}
@@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Utility class to encapsulate data node info and its address. */
- private static final class DNAddrPair {
+ static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
@@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private void closeCurrentBlockReader() {
+ protected void closeCurrentBlockReaders() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
@@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
@Override
@@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
/**
@@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override
public synchronized void unbuffer() {
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 68cc155..43e0eb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -343,7 +343,7 @@ public class TestDFSClientRetries {
// we're starting a new operation on the user level.
doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
.when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
- is.openInfo();
+ is.openInfo(true);
// Seek to beginning forces a reopen of the BlockReader - otherwise it'll
// just keep reading on the existing stream and the fact that we've poisoned
// the block info won't do anything.
[09/24] hadoop git commit: HADOOP-12173. NetworkTopology::add calls
toString always. Contributed by Inigo Goiri
Posted by ar...@apache.org.
HADOOP-12173. NetworkTopology::add calls toString always. Contributed by Inigo Goiri
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e59f6fad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e59f6fad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e59f6fad
Branch: refs/heads/HDFS-7240
Commit: e59f6fad6a8849cfab6acbf012f338d9cc7dd63c
Parents: 5fddc51
Author: Chris Douglas <cd...@apache.org>
Authored: Thu Jul 2 21:39:48 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Jul 2 21:39:48 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/hadoop/net/NetworkTopology.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e59f6fad/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index c60cc0b..63b6763 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -396,14 +396,13 @@ public class NetworkTopology {
int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
netlock.writeLock().lock();
try {
- String oldTopoStr = this.toString();
if( node instanceof InnerNode ) {
throw new IllegalArgumentException(
"Not allow to add an inner node: "+NodeBase.getPath(node));
}
if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
LOG.error("Error: can't add leaf node " + NodeBase.getPath(node) +
- " at depth " + newDepth + " to topology:\n" + oldTopoStr);
+ " at depth " + newDepth + " to topology:\n" + this.toString());
throw new InvalidTopologyException("Failed to add " + NodeBase.getPath(node) +
": You cannot have a rack and a non-rack node at the same " +
"level of the network topology.");
[22/24] hadoop git commit: YARN-2194. Fix bug causing CGroups
functionality to fail on RHEL7. Contributed by Wei Yan.
Posted by ar...@apache.org.
YARN-2194. Fix bug causing CGroups functionality to fail on RHEL7. Contributed by Wei Yan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c40bdb56
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c40bdb56
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c40bdb56
Branch: refs/heads/HDFS-7240
Commit: c40bdb56a79fe1499c2284d493edc84620c0c078
Parents: 99c8c58
Author: Varun Vasudev <vv...@apache.org>
Authored: Tue Jul 7 16:59:29 2015 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Tue Jul 7 16:59:29 2015 +0530
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/util/StringUtils.java | 8 ++++++++
.../java/org/apache/hadoop/util/TestStringUtils.java | 4 ++++
hadoop-yarn-project/CHANGES.txt | 3 +++
.../server/nodemanager/LinuxContainerExecutor.java | 12 ++++++++----
.../linux/privileged/PrivilegedOperation.java | 1 +
.../linux/privileged/PrivilegedOperationExecutor.java | 2 +-
.../nodemanager/util/CgroupsLCEResourcesHandler.java | 6 ++++--
.../native/container-executor/impl/configuration.c | 8 ++++----
.../container-executor/test/test-container-executor.c | 4 ++--
.../TestLinuxContainerExecutorWithMocks.java | 13 +++++++++----
10 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index fc4b0ab..73f9c4f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -872,6 +872,10 @@ public class StringUtils {
return sb.toString();
}
+ public static String join(char separator, Iterable<?> strings) {
+ return join(separator + "", strings);
+ }
+
/**
* Concatenates strings, using a separator.
*
@@ -894,6 +898,10 @@ public class StringUtils {
return sb.toString();
}
+ public static String join(char separator, String[] strings) {
+ return join(separator + "", strings);
+ }
+
/**
* Convert SOME_STUFF to SomeStuff
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
index 85ab8c4..e3e613c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
@@ -278,8 +278,12 @@ public class TestStringUtils extends UnitTestcaseTimeLimit {
s.add("c");
assertEquals("", StringUtils.join(":", s.subList(0, 0)));
assertEquals("a", StringUtils.join(":", s.subList(0, 1)));
+ assertEquals("", StringUtils.join(':', s.subList(0, 0)));
+ assertEquals("a", StringUtils.join(':', s.subList(0, 1)));
assertEquals("a:b", StringUtils.join(":", s.subList(0, 2)));
assertEquals("a:b:c", StringUtils.join(":", s.subList(0, 3)));
+ assertEquals("a:b", StringUtils.join(':', s.subList(0, 2)));
+ assertEquals("a:b:c", StringUtils.join(':', s.subList(0, 3)));
}
@Test (timeout = 30000)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8f63187..2d1d6a2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -595,6 +595,9 @@ Release 2.8.0 - UNRELEASED
YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong
prefix for auth options. (Bibin A Chundatt via devaraj)
+ YARN-2194. Fix bug causing CGroups functionality to fail on RHEL7.
+ (Wei Yan via vvasudev)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index dbe257d..b936969 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -241,8 +241,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
appId,
nmPrivateContainerTokensPath.toUri().getPath().toString(),
- StringUtils.join(",", localDirs),
- StringUtils.join(",", logDirs)));
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ localDirs),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ logDirs)));
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
@@ -363,8 +365,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
nmPrivateContainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString(),
pidFilePath.toString(),
- StringUtils.join(",", localDirs),
- StringUtils.join(",", logDirs),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ localDirs),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ logDirs),
resourcesOptions));
if (tcCommandFile != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
index 74556a8..f220cbd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
@@ -36,6 +36,7 @@ import java.util.List;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class PrivilegedOperation {
+ public final static char LINUX_FILE_PATH_SEPARATOR = '%';
public enum OperationType {
CHECK_SETUP("--checksetup"),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
index 1c4a51c..6fe0f5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
@@ -234,7 +234,7 @@ public class PrivilegedOperationExecutor {
if (noneArgsOnly == false) {
//We have already appended at least one tasks file.
- finalOpArg.append(",");
+ finalOpArg.append(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR);
finalOpArg.append(tasksFile);
} else {
finalOpArg.append(tasksFile);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
index b38e559..6994fc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -409,10 +410,11 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
if (isCpuWeightEnabled()) {
sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks");
- sb.append(",");
+ sb.append(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR);
}
- if (sb.charAt(sb.length() - 1) == ',') {
+ if (sb.charAt(sb.length() - 1) ==
+ PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR) {
sb.deleteCharAt(sb.length() - 1);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
index 15b53ae..51adc97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
@@ -283,7 +283,7 @@ char * get_value(const char* key) {
/**
* Function to return an array of values for a key.
- * Value delimiter is assumed to be a comma.
+ * Value delimiter is assumed to be a '%'.
*/
char ** get_values(const char * key) {
char *value = get_value(key);
@@ -291,7 +291,7 @@ char ** get_values(const char * key) {
}
/**
- * Extracts array of values from the comma separated list of values.
+ * Extracts array of values from the '%' separated list of values.
*/
char ** extract_values(char *value) {
char ** toPass = NULL;
@@ -303,14 +303,14 @@ char ** extract_values(char *value) {
//first allocate any array of 10
if(value != NULL) {
toPass = (char **) malloc(sizeof(char *) * toPassSize);
- tempTok = strtok_r((char *)value, ",", &tempstr);
+ tempTok = strtok_r((char *)value, "%", &tempstr);
while (tempTok != NULL) {
toPass[size++] = tempTok;
if(size == toPassSize) {
toPassSize += MAX_SIZE;
toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
}
- tempTok = strtok_r(NULL, ",", &tempstr);
+ tempTok = strtok_r(NULL, "%", &tempstr);
}
}
if (toPass != NULL) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
index be6cc49..13604f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
@@ -30,8 +30,8 @@
#define TEST_ROOT "/tmp/test-container-executor"
#define DONT_TOUCH_FILE "dont-touch-me"
-#define NM_LOCAL_DIRS TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
- TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
+#define NM_LOCAL_DIRS TEST_ROOT "/local-1%" TEST_ROOT "/local-2%" \
+ TEST_ROOT "/local-3%" TEST_ROOT "/local-4%" TEST_ROOT "/local-5"
#define NM_LOG_DIRS TEST_ROOT "/logs/userlogs"
#define ARRAY_SIZE 1000
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
index d48ce13..82b7fd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -148,8 +149,10 @@ public class TestLinuxContainerExecutorWithMocks {
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
- StringUtils.join(",", dirsHandler.getLocalDirs()),
- StringUtils.join(",", dirsHandler.getLogDirs()), "cgroups=none"),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ dirsHandler.getLocalDirs()),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ dirsHandler.getLogDirs()), "cgroups=none"),
readMockParams());
}
@@ -312,8 +315,10 @@ public class TestLinuxContainerExecutorWithMocks {
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
- StringUtils.join(",", dirsHandler.getLocalDirs()),
- StringUtils.join(",", dirsHandler.getLogDirs()),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ dirsHandler.getLocalDirs()),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ dirsHandler.getLogDirs()),
"cgroups=none"), readMockParams());
}
[13/24] hadoop git commit: HADOOP-12186. ActiveStandbyElector
shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)
Posted by ar...@apache.org.
HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/233cab89
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/233cab89
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/233cab89
Branch: refs/heads/HDFS-7240
Commit: 233cab89adb6bae21d7e171f2af516b92266242c
Parents: bff67df
Author: Vinayakumar B <vi...@apache.org>
Authored: Mon Jul 6 15:39:43 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Mon Jul 6 15:39:43 2015 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++
.../apache/hadoop/ha/ActiveStandbyElector.java | 20 +++++++++++--
.../hadoop/ha/TestActiveStandbyElector.java | 31 ++++++++++++++++++++
3 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233cab89/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 312a996..1d737e5 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -924,6 +924,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12164. Fix TestMove and TestFsShellReturnCode failed to get command
name using reflection. (Lei (Eddy) Xu)
+ HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
+ multiple times (zhihai xu via vinayakumarb)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233cab89/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
index e520a16..e458181 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
@@ -173,7 +173,9 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
private Lock sessionReestablishLockForTests = new ReentrantLock();
private boolean wantToBeInElection;
-
+ private boolean monitorLockNodePending = false;
+ private ZooKeeper monitorLockNodeClient;
+
/**
* Create a new ActiveStandbyElector object <br/>
* The elector is created by providing to it the Zookeeper configuration, the
@@ -468,7 +470,8 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
if (isStaleClient(ctx)) return;
-
+ monitorLockNodePending = false;
+
assert wantToBeInElection :
"Got a StatNode result after quitting election";
@@ -744,6 +747,11 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
return state;
}
+ @VisibleForTesting
+ synchronized boolean isMonitorLockNodePending() {
+ return monitorLockNodePending;
+ }
+
private boolean reEstablishSession() {
int connectionRetryCount = 0;
boolean success = false;
@@ -949,7 +957,13 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
}
private void monitorLockNodeAsync() {
- zkClient.exists(zkLockFilePath,
+ if (monitorLockNodePending && monitorLockNodeClient == zkClient) {
+ LOG.info("Ignore duplicate monitor lock-node request.");
+ return;
+ }
+ monitorLockNodePending = true;
+ monitorLockNodeClient = zkClient;
+ zkClient.exists(zkLockFilePath,
watcher, this,
zkClient);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233cab89/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
index 2e578e2..83a3a4f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
@@ -452,6 +452,10 @@ public class TestActiveStandbyElector {
Event.KeeperState.SyncConnected);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(1);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+ elector.processResult(Code.SESSIONEXPIRED.intValue(), ZK_LOCK_NAME,
+ mockZK, new Stat());
+ Assert.assertFalse(elector.isMonitorLockNodePending());
// session expired should enter safe mode and initiate re-election
// re-election checked via checking re-creation of new zookeeper and
@@ -495,6 +499,13 @@ public class TestActiveStandbyElector {
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+
+ Stat stat = new Stat();
+ stat.setEphemeralOwner(0L);
+ Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Assert.assertFalse(elector.isMonitorLockNodePending());
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
@@ -504,12 +515,18 @@ public class TestActiveStandbyElector {
Event.EventType.NodeDataChanged);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(2);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Assert.assertFalse(elector.isMonitorLockNodePending());
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeChildrenChanged);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(3);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Assert.assertFalse(elector.isMonitorLockNodePending());
// lock node deletion when in standby mode should create znode again
// successful znode creation enters active state and sets monitor
@@ -524,6 +541,10 @@ public class TestActiveStandbyElector {
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(4);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+ stat.setEphemeralOwner(1L);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Assert.assertFalse(elector.isMonitorLockNodePending());
// lock node deletion in active mode should enter neutral mode and create
// znode again successful znode creation enters active state and sets
@@ -538,6 +559,9 @@ public class TestActiveStandbyElector {
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
verifyExistCall(5);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Assert.assertFalse(elector.isMonitorLockNodePending());
// bad path name results in fatal error
Mockito.when(mockEvent.getPath()).thenReturn(null);
@@ -570,6 +594,13 @@ public class TestActiveStandbyElector {
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1);
+ Assert.assertTrue(elector.isMonitorLockNodePending());
+
+ Stat stat = new Stat();
+ stat.setEphemeralOwner(0L);
+ Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Assert.assertFalse(elector.isMonitorLockNodePending());
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
[16/24] hadoop git commit: HADOOP-12185. NetworkTopology is not
efficient adding/getting/removing nodes. Contributed by Inigo Goiri
Posted by ar...@apache.org.
HADOOP-12185. NetworkTopology is not efficient adding/getting/removing nodes. Contributed by Inigo Goiri
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47a69ec7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47a69ec7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47a69ec7
Branch: refs/heads/HDFS-7240
Commit: 47a69ec7a5417cb56b75d07184dd6888ff068302
Parents: ed1e3ce
Author: Chris Douglas <cd...@apache.org>
Authored: Mon Jul 6 15:03:22 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Jul 6 15:03:22 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/net/NetworkTopology.java | 59 +++++++--------
.../apache/hadoop/net/TestClusterTopology.java | 75 ++++++++++++++++++--
2 files changed, 102 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47a69ec7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 63b6763..970ad40 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.net;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
@@ -80,6 +82,7 @@ public class NetworkTopology {
*/
static class InnerNode extends NodeBase {
protected List<Node> children=new ArrayList<Node>();
+ private Map<String, Node> childrenMap = new HashMap<String, Node>();
private int numOfLeaves;
/** Construct an InnerNode from a path-like string */
@@ -171,10 +174,13 @@ public class NetworkTopology {
// this node is the parent of n; add n directly
n.setParent(this);
n.setLevel(this.level+1);
- for(int i=0; i<children.size(); i++) {
- if (children.get(i).getName().equals(n.getName())) {
- children.set(i, n);
- return false;
+ Node prev = childrenMap.put(n.getName(), n);
+ if (prev != null) {
+ for(int i=0; i<children.size(); i++) {
+ if (children.get(i).getName().equals(n.getName())) {
+ children.set(i, n);
+ return false;
+ }
}
}
children.add(n);
@@ -183,17 +189,12 @@ public class NetworkTopology {
} else {
// find the next ancestor node
String parentName = getNextAncestorName(n);
- InnerNode parentNode = null;
- for(int i=0; i<children.size(); i++) {
- if (children.get(i).getName().equals(parentName)) {
- parentNode = (InnerNode)children.get(i);
- break;
- }
- }
+ InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
if (parentNode == null) {
// create a new InnerNode
parentNode = createParentNode(parentName);
children.add(parentNode);
+ childrenMap.put(parentNode.getName(), parentNode);
}
// add n to the subtree of the next ancestor node
if (parentNode.add(n)) {
@@ -234,12 +235,15 @@ public class NetworkTopology {
+parent+", is not a descendent of "+currentPath);
if (isParent(n)) {
// this node is the parent of n; remove n directly
- for(int i=0; i<children.size(); i++) {
- if (children.get(i).getName().equals(n.getName())) {
- children.remove(i);
- numOfLeaves--;
- n.setParent(null);
- return true;
+ if (childrenMap.containsKey(n.getName())) {
+ for (int i=0; i<children.size(); i++) {
+ if (children.get(i).getName().equals(n.getName())) {
+ children.remove(i);
+ childrenMap.remove(n.getName());
+ numOfLeaves--;
+ n.setParent(null);
+ return true;
+ }
}
}
return false;
@@ -262,7 +266,8 @@ public class NetworkTopology {
// if the parent node has no children, remove the parent node too
if (isRemoved) {
if (parentNode.getNumOfChildren() == 0) {
- children.remove(i);
+ Node prev = children.remove(i);
+ childrenMap.remove(prev.getName());
}
numOfLeaves--;
}
@@ -279,12 +284,7 @@ public class NetworkTopology {
if (loc == null || loc.length() == 0) return this;
String[] path = loc.split(PATH_SEPARATOR_STR, 2);
- Node childnode = null;
- for(int i=0; i<children.size(); i++) {
- if (children.get(i).getName().equals(path[0])) {
- childnode = children.get(i);
- }
- }
+ Node childnode = childrenMap.get(path[0]);
if (childnode == null) return null; // non-existing node
if (path.length == 1) return childnode;
if (childnode instanceof InnerNode) {
@@ -311,10 +311,13 @@ public class NetworkTopology {
isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
if (isLeafParent()) { // children are leaves
if (isLeaf) { // excluded node is a leaf node
- int excludedIndex = children.indexOf(excludedNode);
- if (excludedIndex != -1 && leafIndex >= 0) {
- // excluded node is one of the children so adjust the leaf index
- leafIndex = leafIndex>=excludedIndex ? leafIndex+1 : leafIndex;
+ if (excludedNode != null &&
+ childrenMap.containsKey(excludedNode.getName())) {
+ int excludedIndex = children.indexOf(excludedNode);
+ if (excludedIndex != -1 && leafIndex >= 0) {
+ // excluded node is one of the children so adjust the leaf index
+ leafIndex = leafIndex>=excludedIndex ? leafIndex+1 : leafIndex;
+ }
}
}
// range check
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47a69ec7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
index 3ab663f..72fc5cb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.net;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import org.apache.commons.math3.stat.inference.ChiSquareTest;
import org.junit.Assert;
import org.junit.Test;
@@ -79,12 +81,14 @@ public class TestClusterTopology extends Assert {
public void testCountNumNodes() throws Exception {
// create the topology
NetworkTopology cluster = new NetworkTopology();
- cluster.add(getNewNode("node1", "/d1/r1"));
+ NodeElement node1 = getNewNode("node1", "/d1/r1");
+ cluster.add(node1);
NodeElement node2 = getNewNode("node2", "/d1/r2");
cluster.add(node2);
- cluster.add(getNewNode("node3", "/d1/r3"));
- NodeElement node3 = getNewNode("node4", "/d1/r4");
+ NodeElement node3 = getNewNode("node3", "/d1/r3");
cluster.add(node3);
+ NodeElement node4 = getNewNode("node4", "/d1/r4");
+ cluster.add(node4);
// create exclude list
List<Node> excludedNodes = new ArrayList<Node>();
@@ -95,7 +99,7 @@ public class TestClusterTopology extends Assert {
assertEquals("4 nodes should be available with extra excluded Node", 4,
cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
// add one existing node to exclude list
- excludedNodes.add(node3);
+ excludedNodes.add(node4);
assertEquals("excluded nodes with ROOT scope should be considered", 3,
cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
assertEquals("excluded nodes without ~ scope should be considered", 2,
@@ -112,6 +116,69 @@ public class TestClusterTopology extends Assert {
// getting count with non-exist scope.
assertEquals("No nodes should be considered for non-exist scope", 0,
cluster.countNumOfAvailableNodes("/non-exist", excludedNodes));
+ // remove a node from the cluster
+ cluster.remove(node1);
+ assertEquals("1 node should be available", 1,
+ cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
+ }
+
+ /**
+ * Test how well we pick random nodes.
+ */
+ @Test
+ public void testChooseRandom() {
+ // create the topology
+ NetworkTopology cluster = new NetworkTopology();
+ NodeElement node1 = getNewNode("node1", "/d1/r1");
+ cluster.add(node1);
+ NodeElement node2 = getNewNode("node2", "/d1/r2");
+ cluster.add(node2);
+ NodeElement node3 = getNewNode("node3", "/d1/r3");
+ cluster.add(node3);
+ NodeElement node4 = getNewNode("node4", "/d1/r3");
+ cluster.add(node4);
+
+ // Number of iterations to do the test
+ int numIterations = 100;
+
+ // Pick random nodes
+ HashMap<String,Integer> histogram = new HashMap<String,Integer>();
+ for (int i=0; i<numIterations; i++) {
+ String randomNode = cluster.chooseRandom(NodeBase.ROOT).getName();
+ if (!histogram.containsKey(randomNode)) {
+ histogram.put(randomNode, 0);
+ }
+ histogram.put(randomNode, histogram.get(randomNode) + 1);
+ }
+ assertEquals("Random is not selecting all nodes", 4, histogram.size());
+
+ // Check with 99% confidence (alpha=0.01 as confidence = (100 * (1 - alpha)
+ ChiSquareTest chiSquareTest = new ChiSquareTest();
+ double[] expected = new double[histogram.size()];
+ long[] observed = new long[histogram.size()];
+ int j=0;
+ for (Integer occurrence : histogram.values()) {
+ expected[j] = 1.0 * numIterations / histogram.size();
+ observed[j] = occurrence;
+ j++;
+ }
+ boolean chiSquareTestRejected =
+ chiSquareTest.chiSquareTest(expected, observed, 0.01);
+
+ // Check that they have the proper distribution
+ assertFalse("Not choosing nodes randomly", chiSquareTestRejected);
+
+ // Pick random nodes excluding the 2 nodes in /d1/r3
+ histogram = new HashMap<String,Integer>();
+ for (int i=0; i<numIterations; i++) {
+ String randomNode = cluster.chooseRandom("~/d1/r3").getName();
+ if (!histogram.containsKey(randomNode)) {
+ histogram.put(randomNode, 0);
+ }
+ histogram.put(randomNode, histogram.get(randomNode) + 1);
+ }
+ assertEquals("Random is not selecting the nodes it should",
+ 2, histogram.size());
}
private NodeElement getNewNode(String name, String rackLocation) {
[18/24] hadoop git commit: Release process for 2.7.1: Set the release
date for 2.7.1.
Posted by ar...@apache.org.
Release process for 2.7.1: Set the release date for 2.7.1.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bf89ddb9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bf89ddb9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bf89ddb9
Branch: refs/heads/HDFS-7240
Commit: bf89ddb9b8ca27a34074b415f85599dd48b8fc50
Parents: d62b63d
Author: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Authored: Mon Jul 6 16:39:12 2015 -0700
Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Committed: Mon Jul 6 16:39:59 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +-
hadoop-mapreduce-project/CHANGES.txt | 2 +-
hadoop-yarn-project/CHANGES.txt | 2 +-
4 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index f2f9d5c..fab78d4 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -942,7 +942,7 @@ Release 2.7.2 - UNRELEASED
BUG FIXES
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d264f74..e40ea3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1024,7 +1024,7 @@ Release 2.7.2 - UNRELEASED
BUG FIXES
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2458403..4c60174 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -545,7 +545,7 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
if mapId is not in the cache. (zhihai xu via devaraj)
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 803d725..f35be75 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -610,7 +610,7 @@ Release 2.7.2 - UNRELEASED
YARN-3508. Prevent processing preemption events on the main RM dispatcher.
(Varun Saxena via wangda)
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
[10/24] hadoop git commit: HDFS-8577. Avoid retrying to recover lease
on a file which does not exist (Contributed by J.Andreina)
Posted by ar...@apache.org.
HDFS-8577. Avoid retrying to recover lease on a file which does not exist (Contributed by J.Andreina)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2eae130a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2eae130a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2eae130a
Branch: refs/heads/HDFS-7240
Commit: 2eae130ab9edd318c82503c2306f610f2b5a3e51
Parents: e59f6fa
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Jul 3 13:35:48 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Fri Jul 3 13:35:48 2015 +0530
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++
.../java/org/apache/hadoop/hdfs/tools/DebugAdmin.java | 10 ++++++++--
.../java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java | 8 ++++++++
3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2eae130a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6678a3e..4f184fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1002,6 +1002,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8706. Fix typo in datanode startup options in HDFSCommands.html.
(Brahma Reddy Battula via Arpit Agarwal)
+ HDFS-8577. Avoid retrying to recover lease on a file which does not exist
+ (J.Andreina via vinayakumarb)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2eae130a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
index 41f1ca4..d179a5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.tools;
import java.io.DataInputStream;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -254,6 +255,11 @@ public class DebugAdmin extends Configured implements Tool {
IOException ioe = null;
try {
recovered = dfs.recoverLease(new Path(pathStr));
+ } catch (FileNotFoundException e) {
+ System.err.println("recoverLease got exception: " + e.getMessage());
+ System.err.println("Giving up on recoverLease for " + pathStr +
+ " after 1 try");
+ return 1;
} catch (IOException e) {
ioe = e;
}
@@ -262,8 +268,8 @@ public class DebugAdmin extends Configured implements Tool {
return 0;
}
if (ioe != null) {
- System.err.println("recoverLease got exception: ");
- ioe.printStackTrace();
+ System.err.println("recoverLease got exception: " +
+ ioe.getMessage());
} else {
System.err.println("recoverLease returned false.");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2eae130a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
index 52b194d..07f70e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
@@ -37,6 +37,7 @@ import java.io.PrintStream;
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestDebugAdmin {
private MiniDFSCluster cluster;
@@ -116,4 +117,11 @@ public class TestDebugAdmin {
"-block", blockFile.getAbsolutePath()})
);
}
+
+ @Test(timeout = 60000)
+ public void testRecoverLeaseforFileNotFound() throws Exception {
+ assertTrue(runCmd(new String[] {
+ "recoverLease", "-path", "/foo", "-retries", "2" }).contains(
+ "Giving up on recoverLease for /foo after 1 try"));
+ }
}
[11/24] hadoop git commit: YARN-3882. AggregatedLogFormat should
close aclScanner and ownerScanner after create them. Contributed by zhihai xu
Posted by ar...@apache.org.
YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
after create them. Contributed by zhihai xu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/688617d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/688617d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/688617d6
Branch: refs/heads/HDFS-7240
Commit: 688617d6d7e6377a37682b5676b805cc6e8cf3f0
Parents: 2eae130
Author: Xuan <xg...@apache.org>
Authored: Sat Jul 4 21:51:58 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Sat Jul 4 21:51:58 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../logaggregation/AggregatedLogFormat.java | 83 +++++++++++---------
2 files changed, 49 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/688617d6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2009b47..803d725 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -589,6 +589,9 @@ Release 2.8.0 - UNRELEASED
YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
properly in log. (Bibin A Chundatt via devaraj)
+ YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
+ after create them. (zhihai xu via xgong)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/688617d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index debe770..c9453b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -489,18 +489,23 @@ public class AggregatedLogFormat {
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
- TFile.Reader.Scanner ownerScanner = reader.createScanner();
- LogKey key = new LogKey();
- while (!ownerScanner.atEnd()) {
- TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
- key.readFields(entry.getKeyStream());
- if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
- DataInputStream valueStream = entry.getValueStream();
- return valueStream.readUTF();
+ TFile.Reader.Scanner ownerScanner = null;
+ try {
+ ownerScanner = reader.createScanner();
+ LogKey key = new LogKey();
+ while (!ownerScanner.atEnd()) {
+ TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
+ key.readFields(entry.getKeyStream());
+ if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
+ DataInputStream valueStream = entry.getValueStream();
+ return valueStream.readUTF();
+ }
+ ownerScanner.advance();
}
- ownerScanner.advance();
+ return null;
+ } finally {
+ IOUtils.cleanup(LOG, ownerScanner);
}
- return null;
}
/**
@@ -513,38 +518,42 @@ public class AggregatedLogFormat {
public Map<ApplicationAccessType, String> getApplicationAcls()
throws IOException {
// TODO Seek directly to the key once a comparator is specified.
- TFile.Reader.Scanner aclScanner = reader.createScanner();
- LogKey key = new LogKey();
- Map<ApplicationAccessType, String> acls =
- new HashMap<ApplicationAccessType, String>();
- while (!aclScanner.atEnd()) {
- TFile.Reader.Scanner.Entry entry = aclScanner.entry();
- key.readFields(entry.getKeyStream());
- if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
- DataInputStream valueStream = entry.getValueStream();
- while (true) {
- String appAccessOp = null;
- String aclString = null;
- try {
- appAccessOp = valueStream.readUTF();
- } catch (EOFException e) {
- // Valid end of stream.
- break;
- }
- try {
- aclString = valueStream.readUTF();
- } catch (EOFException e) {
- throw new YarnRuntimeException("Error reading ACLs", e);
+ TFile.Reader.Scanner aclScanner = null;
+ try {
+ aclScanner = reader.createScanner();
+ LogKey key = new LogKey();
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>();
+ while (!aclScanner.atEnd()) {
+ TFile.Reader.Scanner.Entry entry = aclScanner.entry();
+ key.readFields(entry.getKeyStream());
+ if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
+ DataInputStream valueStream = entry.getValueStream();
+ while (true) {
+ String appAccessOp = null;
+ String aclString = null;
+ try {
+ appAccessOp = valueStream.readUTF();
+ } catch (EOFException e) {
+ // Valid end of stream.
+ break;
+ }
+ try {
+ aclString = valueStream.readUTF();
+ } catch (EOFException e) {
+ throw new YarnRuntimeException("Error reading ACLs", e);
+ }
+ acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
}
- acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
}
-
+ aclScanner.advance();
}
- aclScanner.advance();
+ return acls;
+ } finally {
+ IOUtils.cleanup(LOG, aclScanner);
}
- return acls;
}
-
+
/**
* Read the next key and return the value-stream.
*
[03/24] hadoop git commit: YARN-3508. Prevent processing preemption
events on the main RM dispatcher. (Varun Saxena via wangda)
Posted by ar...@apache.org.
YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0e4b0669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0e4b0669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0e4b0669
Branch: refs/heads/HDFS-7240
Commit: 0e4b06690ff51fbde3ab26f68fde8aeb32af69af
Parents: 152e5df
Author: Wangda Tan <wa...@apache.org>
Authored: Wed Jul 1 17:32:22 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Jul 1 17:32:22 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/ResourceManager.java | 34 ---------
.../monitor/SchedulingEditPolicy.java | 6 +-
.../monitor/SchedulingMonitor.java | 3 +-
.../ProportionalCapacityPreemptionPolicy.java | 42 ++++++-----
.../scheduler/ContainerPreemptEvent.java | 8 +-
.../scheduler/ContainerPreemptEventType.java | 26 -------
.../scheduler/capacity/CapacityScheduler.java | 24 ++++++
.../scheduler/event/SchedulerEventType.java | 7 +-
.../resourcemanager/TestRMDispatcher.java | 79 ++++++++++++++++++++
...estProportionalCapacityPreemptionPolicy.java | 19 +++--
...pacityPreemptionPolicyForNodePartitions.java | 12 ++-
12 files changed, 161 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3620a71..6839548 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -601,6 +601,9 @@ Release 2.7.2 - UNRELEASED
YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
Saxena via jlowe)
+ YARN-3508. Prevent processing preemption events on the main RM dispatcher.
+ (Varun Saxena via wangda)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 4153ba1..1b606b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -614,9 +613,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {
- rmDispatcher.register(ContainerPreemptEventType.class,
- new RMContainerPreemptEventDispatcher(
- (PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
@@ -787,36 +783,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
@Private
- public static final class
- RMContainerPreemptEventDispatcher
- implements EventHandler<ContainerPreemptEvent> {
-
- private final PreemptableResourceScheduler scheduler;
-
- public RMContainerPreemptEventDispatcher(
- PreemptableResourceScheduler scheduler) {
- this.scheduler = scheduler;
- }
-
- @Override
- public void handle(ContainerPreemptEvent event) {
- ApplicationAttemptId aid = event.getAppId();
- RMContainer container = event.getContainer();
- switch (event.getType()) {
- case DROP_RESERVATION:
- scheduler.dropContainerReservation(container);
- break;
- case PREEMPT_CONTAINER:
- scheduler.preemptContainer(aid, container);
- break;
- case KILL_CONTAINER:
- scheduler.killContainer(container);
- break;
- }
- }
- }
-
- @Private
public static final class ApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
index 1ebc19f..0d587d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
@@ -18,14 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
public interface SchedulingEditPolicy {
- public void init(Configuration config,
- EventHandler<ContainerPreemptEvent> dispatcher,
+ public void init(Configuration config, RMContext context,
PreemptableResourceScheduler scheduler);
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 1682f7d..d4c129b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService {
return scheduleEditPolicy;
}
- @SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception {
- scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+ scheduleEditPolicy.init(conf, rmContext,
(PreemptableResourceScheduler) rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 1f47b5f..5a20a6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -38,13 +38,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -118,8 +118,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
- // the dispatcher to send preempt and kill events
- public EventHandler<ContainerPreemptEvent> dispatcher;
+ private RMContext rmContext;
private final Clock clock;
private double maxIgnoredOverCapacity;
@@ -141,20 +140,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
public ProportionalCapacityPreemptionPolicy(Configuration config,
- EventHandler<ContainerPreemptEvent> dispatcher,
- CapacityScheduler scheduler) {
- this(config, dispatcher, scheduler, new SystemClock());
+ RMContext context, CapacityScheduler scheduler) {
+ this(config, context, scheduler, new SystemClock());
}
public ProportionalCapacityPreemptionPolicy(Configuration config,
- EventHandler<ContainerPreemptEvent> dispatcher,
- CapacityScheduler scheduler, Clock clock) {
- init(config, dispatcher, scheduler);
+ RMContext context, CapacityScheduler scheduler, Clock clock) {
+ init(config, context, scheduler);
this.clock = clock;
}
- public void init(Configuration config,
- EventHandler<ContainerPreemptEvent> disp,
+ public void init(Configuration config, RMContext context,
PreemptableResourceScheduler sched) {
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
assert null == scheduler : "Unexpected duplicate call to init";
@@ -163,7 +159,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
- dispatcher = disp;
+ rmContext = context;
scheduler = (CapacityScheduler) sched;
maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
naturalTerminationFactor =
@@ -196,6 +192,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
+ @SuppressWarnings("unchecked")
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// All partitions to look at
@@ -248,8 +245,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
+ ApplicationAttemptId appAttemptId = e.getKey();
if (LOG.isDebugEnabled()) {
- LOG.debug("Send to scheduler: in app=" + e.getKey()
+ LOG.debug("Send to scheduler: in app=" + appAttemptId
+ " #containers-to-be-preempted=" + e.getValue().size());
}
for (RMContainer container : e.getValue()) {
@@ -257,13 +255,15 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
- dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
- ContainerPreemptEventType.KILL_CONTAINER));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(appAttemptId, container,
+ SchedulerEventType.KILL_CONTAINER));
preempted.remove(container);
} else {
//otherwise just send preemption events
- dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
- ContainerPreemptEventType.PREEMPT_CONTAINER));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(appAttemptId, container,
+ SchedulerEventType.PREEMPT_CONTAINER));
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
}
@@ -735,6 +735,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*/
+ @SuppressWarnings("unchecked")
private void preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
@@ -758,8 +759,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
clusterResource, preemptMap);
if (!observeOnly) {
- dispatcher.handle(new ContainerPreemptEvent(appId, c,
- ContainerPreemptEventType.DROP_RESERVATION));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(
+ appId, c, SchedulerEventType.DROP_RESERVATION));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
index 8eba48d..7ab2758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
@@ -19,20 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
/**
* Simple event class used to communicate containers unreservations, preemption, killing
*/
-public class ContainerPreemptEvent
- extends AbstractEvent<ContainerPreemptEventType> {
+public class ContainerPreemptEvent extends SchedulerEvent {
private final ApplicationAttemptId aid;
private final RMContainer container;
public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
- ContainerPreemptEventType type) {
+ SchedulerEventType type) {
super(type);
this.aid = aid;
this.container = container;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
deleted file mode 100644
index a70a836..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-public enum ContainerPreemptEventType {
-
- DROP_RESERVATION,
- PREEMPT_CONTAINER,
- KILL_CONTAINER
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index f1d0f9c..141aa7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -1346,6 +1347,29 @@ public class CapacityScheduler extends
RMContainerEventType.EXPIRE);
}
break;
+ case DROP_RESERVATION:
+ {
+ ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
+ RMContainer container = dropReservationEvent.getContainer();
+ dropContainerReservation(container);
+ }
+ break;
+ case PREEMPT_CONTAINER:
+ {
+ ContainerPreemptEvent preemptContainerEvent =
+ (ContainerPreemptEvent)event;
+ ApplicationAttemptId aid = preemptContainerEvent.getAppId();
+ RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
+ preemptContainer(aid, containerToBePreempted);
+ }
+ break;
+ case KILL_CONTAINER:
+ {
+ ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
+ RMContainer containerToBeKilled = killContainerEvent.getContainer();
+ killContainer(containerToBeKilled);
+ }
+ break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 13aecb3..9de935b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -36,5 +36,10 @@ public enum SchedulerEventType {
APP_ATTEMPT_REMOVED,
// Source: ContainerAllocationExpirer
- CONTAINER_EXPIRED
+ CONTAINER_EXPIRED,
+
+ // Source: SchedulingEditPolicy
+ DROP_RESERVATION,
+ PREEMPT_CONTAINER,
+ KILL_CONTAINER
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
new file mode 100644
index 0000000..db7c96a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMDispatcher {
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testSchedulerEventDispatcherForPreemptionEvents() {
+ AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+ CapacityScheduler sched = spy(new CapacityScheduler());
+ YarnConfiguration conf = new YarnConfiguration();
+ SchedulerEventDispatcher schedulerDispatcher =
+ new SchedulerEventDispatcher(sched);
+ rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+ rmDispatcher.init(conf);
+ rmDispatcher.start();
+ schedulerDispatcher.init(conf);
+ schedulerDispatcher.start();
+ try {
+ ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
+ RMContainer container = mock(RMContainer.class);
+ ContainerPreemptEvent event1 = new ContainerPreemptEvent(
+ appAttemptId, container, SchedulerEventType.DROP_RESERVATION);
+ rmDispatcher.getEventHandler().handle(event1);
+ ContainerPreemptEvent event2 = new ContainerPreemptEvent(
+ appAttemptId, container, SchedulerEventType.KILL_CONTAINER);
+ rmDispatcher.getEventHandler().handle(event2);
+ ContainerPreemptEvent event3 = new ContainerPreemptEvent(
+ appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER);
+ rmDispatcher.getEventHandler().handle(event3);
+ // Wait for events to be processed by scheduler dispatcher.
+ Thread.sleep(1000);
+ verify(sched, times(3)).handle(any(SchedulerEvent.class));
+ verify(sched).dropContainerReservation(container);
+ verify(sched).preemptContainer(appAttemptId, container);
+ verify(sched).killContainer(container);
+ } catch (InterruptedException e) {
+ Assert.fail();
+ } finally {
+ schedulerDispatcher.stop();
+ rmDispatcher.stop();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 6c0ed6c..2c0c6d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -66,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -104,7 +106,7 @@ public class TestProportionalCapacityPreemptionPolicy {
RMContext rmContext = null;
RMNodeLabelsManager lm = null;
CapacitySchedulerConfiguration schedConf = null;
- EventHandler<ContainerPreemptEvent> mDisp = null;
+ EventHandler<SchedulerEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
Resource clusterResources = null;
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -164,6 +166,9 @@ public class TestProportionalCapacityPreemptionPolicy {
when(mCS.getRMContext()).thenReturn(rmContext);
when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class);
+ Dispatcher disp = mock(Dispatcher.class);
+ when(rmContext.getDispatcher()).thenReturn(disp);
+ when(disp.getEventHandler()).thenReturn(mDisp);
rand = new Random();
long seed = rand.nextLong();
System.out.println(name.getMethodName() + " SEED: " + seed);
@@ -866,12 +871,12 @@ public class TestProportionalCapacityPreemptionPolicy {
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
- private final ContainerPreemptEventType type;
+ private final SchedulerEventType type;
IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
this(appAttId, PREEMPT_CONTAINER);
}
IsPreemptionRequestFor(ApplicationAttemptId appAttId,
- ContainerPreemptEventType type) {
+ SchedulerEventType type) {
this.appAttId = appAttId;
this.type = type;
}
@@ -888,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
ProportionalCapacityPreemptionPolicy policy =
- new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+ new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index e13320c..114769c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -50,13 +50,13 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -92,7 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
private Configuration conf = null;
private CapacitySchedulerConfiguration csConf = null;
private CapacityScheduler cs = null;
- private EventHandler<ContainerPreemptEvent> mDisp = null;
+ private EventHandler<SchedulerEvent> mDisp = null;
private ProportionalCapacityPreemptionPolicy policy = null;
private Resource clusterResource = null;
@@ -125,11 +126,14 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
rmContext = mock(RMContext.class);
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+ Dispatcher disp = mock(Dispatcher.class);
+ when(rmContext.getDispatcher()).thenReturn(disp);
+ when(disp.getEventHandler()).thenReturn(mDisp);
csConf = new CapacitySchedulerConfiguration();
when(cs.getConfiguration()).thenReturn(csConf);
when(cs.getRMContext()).thenReturn(rmContext);
- policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+ policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
partitionToResource = new HashMap<>();
nodeIdToSchedulerNodes = new HashMap<>();
nameToCSQueues = new HashMap<>();
@@ -828,7 +832,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
when(cs.getClusterResource()).thenReturn(clusterResource);
mockApplications(appsConfig);
- policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+ policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
}
private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
[23/24] hadoop git commit: HADOOP-12186. ActiveStandbyElector
shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)
Moved CHANGES.txt entry to 2.7.2
Posted by ar...@apache.org.
HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)
Moved CHANGES.txt entry to 2.7.2
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0febce0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0febce0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0febce0
Branch: refs/heads/HDFS-7240
Commit: e0febce0e74ec69597376774f771da46834c42b1
Parents: c40bdb5
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Jul 7 18:13:35 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Tue Jul 7 18:13:35 2015 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0febce0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 5d11db9..ee96eee 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -930,9 +930,6 @@ Release 2.8.0 - UNRELEASED
HADOOP-12164. Fix TestMove and TestFsShellReturnCode failed to get command
name using reflection. (Lei (Eddy) Xu)
- HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
- multiple times (zhihai xu via vinayakumarb)
-
HADOOP-12117. Potential NPE from Configuration#loadProperty with
allowNullValueProperties set. (zhihai xu via vinayakumarb)
@@ -948,6 +945,9 @@ Release 2.7.2 - UNRELEASED
BUG FIXES
+ HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
+ multiple times (zhihai xu via vinayakumarb)
+
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
[24/24] hadoop git commit: Merge remote-tracking branch
'apache-commit/trunk' into HDFS-7240
Posted by ar...@apache.org.
Merge remote-tracking branch 'apache-commit/trunk' into HDFS-7240
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/83f39a32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/83f39a32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/83f39a32
Branch: refs/heads/HDFS-7240
Commit: 83f39a32c00dc10c3960ceba58210e001b41576d
Parents: 93cebb2 e0febce
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Jul 7 09:00:10 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Jul 7 09:00:10 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 19 +-
.../org/apache/hadoop/conf/Configuration.java | 8 +-
.../apache/hadoop/fs/RawLocalFileSystem.java | 36 +--
.../java/org/apache/hadoop/fs/shell/Mkdir.java | 3 +-
.../apache/hadoop/ha/ActiveStandbyElector.java | 20 +-
.../org/apache/hadoop/ipc/RpcClientUtil.java | 24 ++
.../main/java/org/apache/hadoop/ipc/Server.java | 4 +-
.../org/apache/hadoop/net/NetworkTopology.java | 62 ++---
.../org/apache/hadoop/util/StringUtils.java | 8 +
.../org/apache/hadoop/net/unix/DomainSocket.c | 9 +-
.../apache/hadoop/conf/TestConfiguration.java | 15 ++
.../org/apache/hadoop/fs/SymlinkBaseTest.java | 45 +++-
.../apache/hadoop/fs/TestLocalFileSystem.java | 26 ++-
.../apache/hadoop/fs/TestSymlinkLocalFS.java | 18 ++
.../hadoop/fs/shell/TestCopyPreserveFlag.java | 63 ++---
.../hadoop/ha/TestActiveStandbyElector.java | 31 +++
.../apache/hadoop/net/TestClusterTopology.java | 75 +++++-
.../org/apache/hadoop/util/TestStringUtils.java | 4 +
.../apache/hadoop/hdfs/web/JsonUtilClient.java | 14 ++
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 2 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 17 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 2 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 233 +++++++++++++------
.../hdfs/server/blockmanagement/BlockInfo.java | 7 +-
.../blockmanagement/BlockInfoContiguous.java | 9 +-
.../BlockInfoUnderConstruction.java | 22 +-
.../BlockInfoUnderConstructionContiguous.java | 13 +-
.../server/blockmanagement/BlockManager.java | 143 ++++++------
.../hdfs/server/blockmanagement/BlocksMap.java | 4 +-
.../ContiguousBlockStorageOp.java | 7 +-
.../blockmanagement/CorruptReplicasMap.java | 62 +++--
.../hdfs/server/namenode/FSDirWriteFileOp.java | 6 +-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 28 ++-
.../hdfs/server/namenode/FSEditLogLoader.java | 2 +-
.../hdfs/server/namenode/NamenodeFsck.java | 12 +-
.../apache/hadoop/hdfs/tools/DebugAdmin.java | 10 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 6 +-
.../hadoop/hdfs/TestDFSClientRetries.java | 2 +-
.../blockmanagement/BlockManagerTestUtil.java | 7 +-
.../server/blockmanagement/TestBlockInfo.java | 10 +-
.../blockmanagement/TestBlockManager.java | 10 +-
.../blockmanagement/TestCorruptReplicaInfo.java | 15 +-
.../hadoop/hdfs/server/mover/TestMover.java | 19 ++
.../hdfs/server/namenode/FSXAttrBaseTest.java | 5 +-
.../hadoop/hdfs/tools/TestDebugAdmin.java | 8 +
.../TestOfflineImageViewerForXAttr.java | 3 +
.../org/apache/hadoop/tracing/TestTracing.java | 18 +-
hadoop-mapreduce-project/CHANGES.txt | 5 +-
.../apache/hadoop/mapred/ShuffleHandler.java | 3 +-
.../hadoop/mapred/TestShuffleHandler.java | 101 ++++++++
hadoop-yarn-project/CHANGES.txt | 20 +-
.../logaggregation/AggregatedLogFormat.java | 83 ++++---
...TimelineAuthenticationFilterInitializer.java | 5 +-
.../nodemanager/LinuxContainerExecutor.java | 12 +-
.../linux/privileged/PrivilegedOperation.java | 1 +
.../privileged/PrivilegedOperationExecutor.java | 2 +-
.../logaggregation/AppLogAggregatorImpl.java | 6 +-
.../util/CgroupsLCEResourcesHandler.java | 6 +-
.../container-executor/impl/configuration.c | 8 +-
.../test/test-container-executor.c | 4 +-
.../TestLinuxContainerExecutorWithMocks.java | 13 +-
.../TestLogAggregationService.java | 36 +++
.../server/resourcemanager/ResourceManager.java | 34 ---
.../monitor/SchedulingEditPolicy.java | 6 +-
.../monitor/SchedulingMonitor.java | 3 +-
.../ProportionalCapacityPreemptionPolicy.java | 42 ++--
.../scheduler/ContainerPreemptEvent.java | 8 +-
.../scheduler/ContainerPreemptEventType.java | 26 ---
.../scheduler/capacity/CapacityScheduler.java | 24 ++
.../scheduler/event/SchedulerEventType.java | 7 +-
.../scheduler/fair/FSSchedulerNode.java | 11 +-
.../resourcemanager/TestRMDispatcher.java | 79 +++++++
...estProportionalCapacityPreemptionPolicy.java | 19 +-
...pacityPreemptionPolicyForNodePartitions.java | 12 +-
74 files changed, 1205 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f39a32/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f39a32/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f39a32/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
[04/24] hadoop git commit: HADOOP-12171. Shorten overly-long htrace
span names for server (cmccabe)
Posted by ar...@apache.org.
HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a78d5074
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a78d5074
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a78d5074
Branch: refs/heads/HDFS-7240
Commit: a78d5074fb3da4779a6b5fd9947e60b9d755ee14
Parents: 0e4b066
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Jul 1 17:57:11 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Jul 1 17:57:11 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 ++
.../org/apache/hadoop/ipc/RpcClientUtil.java | 24 ++++++++++++++++++++
.../main/java/org/apache/hadoop/ipc/Server.java | 4 +++-
.../org/apache/hadoop/tracing/TestTracing.java | 18 +++++++--------
4 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 39e2e5e..24431ba 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -673,6 +673,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12124. Add HTrace support for FsShell (cmccabe)
+ HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
index d9bd71b..da1e699 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
@@ -210,4 +210,28 @@ public class RpcClientUtil {
}
return clazz.getSimpleName() + "#" + method.getName();
}
+
+ /**
+ * Convert an RPC class method to a string.
+ * The format we want is
+ * 'SecondOutermostClassShortName#OutermostClassShortName'.
+ *
+ * For example, if the full class name is:
+ * org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations
+ *
+ * the format we want is:
+ * ClientProtocol#getBlockLocations
+ */
+ public static String toTraceName(String fullName) {
+ int lastPeriod = fullName.lastIndexOf('.');
+ if (lastPeriod < 0) {
+ return fullName;
+ }
+ int secondLastPeriod = fullName.lastIndexOf('.', lastPeriod - 1);
+ if (secondLastPeriod < 0) {
+ return fullName;
+ }
+ return fullName.substring(secondLastPeriod + 1, lastPeriod) + "#" +
+ fullName.substring(lastPeriod + 1);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 98fffc0..4026fe0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -1963,7 +1963,9 @@ public abstract class Server {
// If the incoming RPC included tracing info, always continue the trace
TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),
header.getTraceInfo().getParentId());
- traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
+ traceSpan = Trace.startSpan(
+ RpcClientUtil.toTraceName(rpcRequest.toString()),
+ parentSpan).detach();
}
Call call = new Call(header.getCallId(), header.getRetryCount(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
index 58b3659..c3d2c73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
@@ -67,18 +67,18 @@ public class TestTracing {
String[] expectedSpanNames = {
"testWriteTraceHooks",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
+ "ClientProtocol#create",
"ClientNamenodeProtocol#create",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
+ "ClientProtocol#fsync",
"ClientNamenodeProtocol#fsync",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
+ "ClientProtocol#complete",
"ClientNamenodeProtocol#complete",
"newStreamForCreate",
"DFSOutputStream#write",
"DFSOutputStream#close",
"dataStreamer",
"OpWriteBlockProto",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
+ "ClientProtocol#addBlock",
"ClientNamenodeProtocol#addBlock"
};
SetSpanReceiver.assertSpanNamesFound(expectedSpanNames);
@@ -95,11 +95,11 @@ public class TestTracing {
// and children of them are exception.
String[] spansInTopTrace = {
"testWriteTraceHooks",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
+ "ClientProtocol#create",
"ClientNamenodeProtocol#create",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
+ "ClientProtocol#fsync",
"ClientNamenodeProtocol#fsync",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
+ "ClientProtocol#complete",
"ClientNamenodeProtocol#complete",
"newStreamForCreate",
"DFSOutputStream#write",
@@ -113,7 +113,7 @@ public class TestTracing {
// test for timeline annotation added by HADOOP-11242
Assert.assertEquals("called",
- map.get("org.apache.hadoop.hdfs.protocol.ClientProtocol.create")
+ map.get("ClientProtocol#create")
.get(0).getTimelineAnnotations()
.get(0).getMessage());
@@ -131,7 +131,7 @@ public class TestTracing {
String[] expectedSpanNames = {
"testReadTraceHooks",
- "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
+ "ClientProtocol#getBlockLocations",
"ClientNamenodeProtocol#getBlockLocations",
"OpReadBlockProto"
};
[17/24] hadoop git commit: HDFS-8652. Track BlockInfo instead of
Block in CorruptReplicasMap. Contributed by Jing Zhao.
Posted by ar...@apache.org.
HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d62b63d2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d62b63d2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d62b63d2
Branch: refs/heads/HDFS-7240
Commit: d62b63d297bff12d93de560dd50ddd48743b851d
Parents: 47a69ec
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jul 6 15:54:07 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Jul 6 15:54:25 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hdfs/server/blockmanagement/BlockInfo.java | 7 +-
.../blockmanagement/BlockInfoContiguous.java | 9 +-
.../BlockInfoUnderConstruction.java | 22 ++-
.../BlockInfoUnderConstructionContiguous.java | 13 +-
.../server/blockmanagement/BlockManager.java | 143 +++++++++----------
.../hdfs/server/blockmanagement/BlocksMap.java | 4 +-
.../ContiguousBlockStorageOp.java | 7 +-
.../blockmanagement/CorruptReplicasMap.java | 62 ++++----
.../hdfs/server/namenode/FSDirWriteFileOp.java | 6 +-
.../hdfs/server/namenode/FSEditLogLoader.java | 2 +-
.../hdfs/server/namenode/NamenodeFsck.java | 12 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 6 +-
.../blockmanagement/BlockManagerTestUtil.java | 7 +-
.../server/blockmanagement/TestBlockInfo.java | 10 +-
.../blockmanagement/TestBlockManager.java | 10 +-
.../blockmanagement/TestCorruptReplicaInfo.java | 15 +-
17 files changed, 169 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9edc2af..d264f74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -701,6 +701,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang)
+ HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. (jing9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 5ad992b..4df2f0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -179,7 +179,7 @@ public abstract class BlockInfo extends Block
* information indicating the index of the block in the
* corresponding block group.
*/
- abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
+ abstract void addStorage(DatanodeStorageInfo storage, Block reportedBlock);
/**
* Remove {@link DatanodeStorageInfo} location for a block
@@ -193,6 +193,11 @@ public abstract class BlockInfo extends Block
abstract void replaceBlock(BlockInfo newBlock);
/**
+ * @return true if there is no storage storing the block
+ */
+ abstract boolean hasEmptyStorage();
+
+ /**
* Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index de64ad8..561faca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -45,8 +45,8 @@ public class BlockInfoContiguous extends BlockInfo {
}
@Override
- boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
- return ContiguousBlockStorageOp.addStorage(this, storage);
+ void addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+ ContiguousBlockStorageOp.addStorage(this, storage);
}
@Override
@@ -73,4 +73,9 @@ public class BlockInfoContiguous extends BlockInfo {
ucBlock.setBlockCollection(getBlockCollection());
return ucBlock;
}
+
+ @Override
+ boolean hasEmptyStorage() {
+ return ContiguousBlockStorageOp.hasEmptyStorage(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index 9cd3987..7924709 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -274,18 +273,17 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
"No blocks found, lease removed.");
}
boolean allLiveReplicasTriedAsPrimary = true;
- for (int i = 0; i < replicas.size(); i++) {
+ for (ReplicaUnderConstruction replica : replicas) {
// Check if all replicas have been tried or not.
- if (replicas.get(i).isAlive()) {
- allLiveReplicasTriedAsPrimary =
- (allLiveReplicasTriedAsPrimary &&
- replicas.get(i).getChosenAsPrimary());
+ if (replica.isAlive()) {
+ allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary
+ && replica.getChosenAsPrimary();
}
}
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
- for (int i = 0; i < replicas.size(); i++) {
- replicas.get(i).setChosenAsPrimary(false);
+ for (ReplicaUnderConstruction replica : replicas) {
+ replica.setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
@@ -345,10 +343,6 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
* Convert an under construction block to a complete block.
*
* @return a complete block.
- * @throws IOException
- * if the state of the block (the generation stamp and the length)
- * has not been committed by the client or it does not have at
- * least a minimal number of replicas reported from data-nodes.
*/
public abstract BlockInfo convertToCompleteBlock();
@@ -386,8 +380,8 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
}
private void appendUCParts(StringBuilder sb) {
- sb.append("{UCState=").append(blockUCState)
- .append(", truncateBlock=" + truncateBlock)
+ sb.append("{UCState=").append(blockUCState).append(", truncateBlock=")
+ .append(truncateBlock)
.append(", primaryNodeIndex=").append(primaryNodeIndex)
.append(", replicas=[");
if (replicas != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
index d3cb337..963f247 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
@@ -55,10 +55,6 @@ public class BlockInfoUnderConstructionContiguous extends
* Convert an under construction block to a complete block.
*
* @return BlockInfo - a complete block.
- * @throws IOException if the state of the block
- * (the generation stamp and the length) has not been committed by
- * the client or it does not have at least a minimal number of replicas
- * reported from data-nodes.
*/
@Override
public BlockInfoContiguous convertToCompleteBlock() {
@@ -69,8 +65,8 @@ public class BlockInfoUnderConstructionContiguous extends
}
@Override
- boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
- return ContiguousBlockStorageOp.addStorage(this, storage);
+ void addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+ ContiguousBlockStorageOp.addStorage(this, storage);
}
@Override
@@ -89,6 +85,11 @@ public class BlockInfoUnderConstructionContiguous extends
}
@Override
+ boolean hasEmptyStorage() {
+ return ContiguousBlockStorageOp.hasEmptyStorage(this);
+ }
+
+ @Override
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<>(numLocations);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0b60a97..6ae3ee2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
@@ -197,7 +196,7 @@ public class BlockManager implements BlockStatsMXBean {
* notified of all block deletions that might have been pending
* when the failover happened.
*/
- private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
+ private final Set<BlockInfo> postponedMisreplicatedBlocks = Sets.newHashSet();
/**
* Maps a StorageID to the set of blocks that are "extra" for this
@@ -338,8 +337,7 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
this.shouldCheckForEnoughRacks =
- conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
- ? false : true;
+ conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@@ -465,8 +463,7 @@ public class BlockManager implements BlockStatsMXBean {
/** Should the access keys be updated? */
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
- return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
- : false;
+ return isBlockTokenEnabled() && blockTokenSecretManager.updateKeys(updateTime);
}
public void activate(Configuration conf) {
@@ -519,14 +516,14 @@ public class BlockManager implements BlockStatsMXBean {
synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
- for (Block block : neededReplications) {
+ for (BlockInfo block : neededReplications) {
dumpBlockMeta(block, out);
}
}
// Dump any postponed over-replicated blocks
out.println("Mis-replicated blocks that have been postponed:");
- for (Block block : postponedMisreplicatedBlocks) {
+ for (BlockInfo block : postponedMisreplicatedBlocks) {
dumpBlockMeta(block, out);
}
@@ -544,11 +541,9 @@ public class BlockManager implements BlockStatsMXBean {
* Dump the metadata for the given block in a human-readable
* form.
*/
- private void dumpBlockMeta(Block block, PrintWriter out) {
- List<DatanodeDescriptor> containingNodes =
- new ArrayList<DatanodeDescriptor>();
- List<DatanodeStorageInfo> containingLiveReplicasNodes =
- new ArrayList<>();
+ private void dumpBlockMeta(BlockInfo block, PrintWriter out) {
+ List<DatanodeDescriptor> containingNodes = new ArrayList<>();
+ List<DatanodeStorageInfo> containingLiveReplicasNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
@@ -556,17 +551,16 @@ public class BlockManager implements BlockStatsMXBean {
containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL);
- // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
- // not included in the numReplicas.liveReplicas() count
+ // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which
+ // are not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedAndDecommissioning();
-
- if (block instanceof BlockInfo) {
- BlockCollection bc = ((BlockInfo) block).getBlockCollection();
- String fileName = (bc == null) ? "[orphaned]" : bc.getName();
- out.print(fileName + ": ");
- }
+
+ BlockCollection bc = block.getBlockCollection();
+ String fileName = (bc == null) ? "[orphaned]" : bc.getName();
+ out.print(fileName + ": ");
+
// l: == live:, d: == decommissioned c: == corrupt e: == excess
out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
@@ -575,8 +569,8 @@ public class BlockManager implements BlockStatsMXBean {
" c: " + numReplicas.corruptReplicas() +
" e: " + numReplicas.excessReplicas() + ") ");
- Collection<DatanodeDescriptor> corruptNodes =
- corruptReplicas.getNodes(block);
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(block);
for (DatanodeStorageInfo storage : getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@ -813,7 +807,8 @@ public class BlockManager implements BlockStatsMXBean {
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
int curBlk;
- long curPos = 0, blkSize = 0;
+ long curPos = 0;
+ long blkSize;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
@@ -1204,10 +1199,11 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- *
- * @param b
+ * Mark a replica (of a contiguous block) or an internal block (of a striped
+ * block group) as corrupt.
+ * @param b Indicating the reported bad block and the corresponding BlockInfo
+ * stored in blocksMap.
* @param storageInfo storage that contains the block, if known. null otherwise.
- * @throws IOException
*/
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo,
@@ -1228,7 +1224,7 @@ public class BlockManager implements BlockStatsMXBean {
}
// Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+ corruptReplicas.addToCorruptReplicasMap(b.stored, node, b.reason,
b.reasonCode);
NumberReplicas numberOfReplicas = countNodes(b.stored);
@@ -1250,7 +1246,7 @@ public class BlockManager implements BlockStatsMXBean {
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
- invalidateBlock(b, node);
+ invalidateBlock(b, node, numberOfReplicas);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.stored, -1, 0);
@@ -1258,12 +1254,15 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Invalidates the given block on the given datanode.
- * @return true if the block was successfully invalidated and no longer
- * present in the BlocksMap
+ * Invalidates the given block on the given datanode. Note that before this
+ * call we have already checked the current live replicas of the block and
+ * make sure it's safe to invalidate the replica.
+ *
+ * @return true if the replica was successfully invalidated and no longer
+ * associated with the DataNode.
*/
- private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
- ) throws IOException {
+ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
+ NumberReplicas nr) throws IOException {
blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
@@ -1272,35 +1271,30 @@ public class BlockManager implements BlockStatsMXBean {
}
// Check how many copies we have of the block
- NumberReplicas nr = countNodes(b.stored);
if (nr.replicasOnStaleNodes() > 0) {
blockLog.info("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes());
- postponeBlock(b.corrupted);
+ postponeBlock(b.stored);
return false;
- } else if (nr.liveReplicas() >= 1) {
- // If we have at least one copy on a live node, then we can delete it.
+ } else {
+ // we already checked the number of replicas in the caller of this
+ // function and we know there is at least one copy on a live node, so we
+ // can delete it.
addToInvalidates(b.corrupted, dn);
removeStoredBlock(b.stored, node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
return true;
- } else {
- blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and" +
- " was not deleted", b, dn);
- return false;
}
}
-
public void setPostponeBlocksFromFuture(boolean postpone) {
this.shouldPostponeBlocksFromFuture = postpone;
}
-
- private void postponeBlock(Block blk) {
+ private void postponeBlock(BlockInfo blk) {
if (postponedMisreplicatedBlocks.add(blk)) {
postponedMisreplicatedBlocksCount.incrementAndGet();
}
@@ -1374,7 +1368,7 @@ public class BlockManager implements BlockStatsMXBean {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
- BlockCollection bc = null;
+ BlockCollection bc;
int additionalReplRequired;
int scheduledWork = 0;
@@ -1535,9 +1529,9 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
+ for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
- targetList.append(targets[k].getDatanodeDescriptor());
+ targetList.append(target.getDatanodeDescriptor());
}
blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
rw.block, targetList);
@@ -1614,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
List<DatanodeDescriptor> datanodeDescriptors = null;
if (nodes != null) {
datanodeDescriptors = new ArrayList<>(nodes.size());
- for (int i = 0; i < nodes.size(); i++) {
- DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
+ for (String nodeStr : nodes) {
+ DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodeStr);
if (node != null) {
datanodeDescriptors.add(node);
}
@@ -1654,7 +1648,7 @@ public class BlockManager implements BlockStatsMXBean {
* the given block
*/
@VisibleForTesting
- DatanodeDescriptor chooseSourceDatanode(Block block,
+ DatanodeDescriptor chooseSourceDatanode(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
@@ -1734,16 +1728,16 @@ public class BlockManager implements BlockStatsMXBean {
if (timedOutItems != null) {
namesystem.writeLock();
try {
- for (int i = 0; i < timedOutItems.length; i++) {
+ for (BlockInfo timedOutItem : timedOutItems) {
/*
* Use the blockinfo from the blocksmap to be certain we're working
* with the most up-to-date block information (e.g. genstamp).
*/
- BlockInfo bi = getStoredBlock(timedOutItems[i]);
+ BlockInfo bi = getStoredBlock(timedOutItem);
if (bi == null) {
continue;
}
- NumberReplicas num = countNodes(timedOutItems[i]);
+ NumberReplicas num = countNodes(timedOutItem);
if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi));
@@ -1760,7 +1754,7 @@ public class BlockManager implements BlockStatsMXBean {
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
assert namesystem.hasReadLock();
- DatanodeDescriptor node = null;
+ DatanodeDescriptor node;
try {
node = datanodeManager.getDatanode(nodeReg);
} catch (UnregisteredNodeException e) {
@@ -2022,7 +2016,7 @@ public class BlockManager implements BlockStatsMXBean {
startIndex += (base+1);
}
}
- Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
+ Iterator<BlockInfo> it = postponedMisreplicatedBlocks.iterator();
for (int tmp = 0; tmp < startIndex; tmp++) {
it.next();
}
@@ -2117,7 +2111,7 @@ public class BlockManager implements BlockStatsMXBean {
long oldGenerationStamp, long oldNumBytes,
DatanodeStorageInfo[] newStorages) throws IOException {
assert namesystem.hasWriteLock();
- BlockToMarkCorrupt b = null;
+ BlockToMarkCorrupt b;
if (block.getGenerationStamp() != oldGenerationStamp) {
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
"genstamp does not match " + oldGenerationStamp
@@ -2719,7 +2713,7 @@ public class BlockManager implements BlockStatsMXBean {
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
- invalidateCorruptReplicas(storedBlock, reportedBlock);
+ invalidateCorruptReplicas(storedBlock, reportedBlock, num);
}
return storedBlock;
}
@@ -2752,18 +2746,20 @@ public class BlockManager implements BlockStatsMXBean {
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
- private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
+ private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
+ NumberReplicas numberReplicas) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true;
if (nodes == null)
return;
// make a copy of the array of nodes in order to avoid
// ConcurrentModificationException, when the block is removed from the node
- DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
+ DatanodeDescriptor[] nodesCopy = nodes.toArray(
+ new DatanodeDescriptor[nodes.size()]);
for (DatanodeDescriptor node : nodesCopy) {
try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
- Reason.ANY), node)) {
+ Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {
@@ -2813,7 +2809,6 @@ public class BlockManager implements BlockStatsMXBean {
replicationQueuesInitializer.join();
} catch (final InterruptedException e) {
LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
- return;
} finally {
replicationQueuesInitializer = null;
}
@@ -3175,8 +3170,7 @@ public class BlockManager implements BlockStatsMXBean {
CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
.get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
if (cblock != null) {
- boolean removed = false;
- removed |= node.getPendingCached().remove(cblock);
+ boolean removed = node.getPendingCached().remove(cblock);
removed |= node.getCached().remove(cblock);
removed |= node.getPendingUncached().remove(cblock);
if (removed) {
@@ -3392,7 +3386,7 @@ public class BlockManager implements BlockStatsMXBean {
int excess = 0;
int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+ for (DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
@@ -3413,7 +3407,8 @@ public class BlockManager implements BlockStatsMXBean {
stale++;
}
}
- return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
+ return new NumberReplicas(live, decommissioned, decommissioning, corrupt,
+ excess, stale);
}
/**
@@ -3596,8 +3591,6 @@ public class BlockManager implements BlockStatsMXBean {
String src, BlockInfo[] blocks) {
for (BlockInfo b: blocks) {
if (!b.isComplete()) {
- final BlockInfoUnderConstruction uc =
- (BlockInfoUnderConstruction)b;
final int numNodes = b.numNodes();
final int min = getMinStorageNum(b);
final BlockUCState state = b.getBlockUCState();
@@ -3723,11 +3716,7 @@ public class BlockManager implements BlockStatsMXBean {
return blocksMap.getBlockCollection(b);
}
- public int numCorruptReplicas(Block block) {
- return corruptReplicas.numCorruptReplicas(block);
- }
-
- public void removeBlockFromMap(Block block) {
+ public void removeBlockFromMap(BlockInfo block) {
removeFromExcessReplicateMap(block);
blocksMap.removeBlock(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
@@ -3737,7 +3726,7 @@ public class BlockManager implements BlockStatsMXBean {
/**
* If a block is removed from blocksMap, remove it from excessReplicateMap.
*/
- private void removeFromExcessReplicateMap(Block block) {
+ private void removeFromExcessReplicateMap(BlockInfo block) {
for (DatanodeStorageInfo info : getStorages(block)) {
String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
LightWeightLinkedSet<BlockInfo> excessReplicas =
@@ -3768,14 +3757,14 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Get the replicas which are corrupt for a given block.
*/
- public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+ public Collection<DatanodeDescriptor> getCorruptReplicas(BlockInfo block) {
return corruptReplicas.getNodes(block);
}
/**
* Get reason for certain corrupted replicas for a given block and a given dn.
*/
- public String getCorruptReason(Block block, DatanodeDescriptor node) {
+ public String getCorruptReason(BlockInfo block, DatanodeDescriptor node) {
return corruptReplicas.getCorruptReason(block, node);
}
@@ -3869,7 +3858,7 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0);
- };
+ }
public static LocatedBlock newLocatedBlock(
ExtendedBlock b, DatanodeStorageInfo[] storages,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 0dbf485..85cea5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -117,7 +117,7 @@ class BlocksMap {
* remove it from all data-node lists it belongs to;
* and remove all data-node locations associated with the block.
*/
- void removeBlock(Block block) {
+ void removeBlock(BlockInfo block) {
BlockInfo blockInfo = blocks.remove(block);
if (blockInfo == null)
return;
@@ -190,7 +190,7 @@ class BlocksMap {
// remove block from the data-node list and the node from the block info
boolean removed = node.removeBlock(info);
- if (info.getDatanode(0) == null // no datanodes left
+ if (info.hasEmptyStorage() // no datanodes left
&& info.isDeleted()) { // does not belong to a file
blocks.remove(b); // remove block from the map
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
index 092f65e..70251e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
@@ -45,13 +45,12 @@ class ContiguousBlockStorageOp {
return last;
}
- static boolean addStorage(BlockInfo b, DatanodeStorageInfo storage) {
+ static void addStorage(BlockInfo b, DatanodeStorageInfo storage) {
// find the last null node
int lastNode = ensureCapacity(b, 1);
b.setStorageInfo(lastNode, storage);
b.setNext(lastNode, null);
b.setPrevious(lastNode, null);
- return true;
}
static boolean removeStorage(BlockInfo b,
@@ -103,4 +102,8 @@ class ContiguousBlockStorageOp {
"newBlock already exists.");
}
}
+
+ static boolean hasEmptyStorage(BlockInfo b) {
+ return b.getStorageInfo(0) == null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
index fc2e234..9a0023d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
@@ -17,8 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.Server;
@@ -46,8 +46,12 @@ public class CorruptReplicasMap{
CORRUPTION_REPORTED // client or datanode reported the corruption
}
- private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
- new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
+ /**
+ * Used to track corrupted replicas (for contiguous block) or internal blocks
+ * (for striped block) and the corresponding DataNodes. For a striped block,
+ * the key here is the striped block group object stored in the blocksMap.
+ */
+ private final SortedMap<BlockInfo, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<>();
/**
* Mark the block belonging to datanode as corrupt.
@@ -57,21 +61,21 @@ public class CorruptReplicasMap{
* @param reason a textual reason (for logging purposes)
* @param reasonCode the enum representation of the reason
*/
- void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+ void addToCorruptReplicasMap(BlockInfo blk, DatanodeDescriptor dn,
String reason, Reason reasonCode) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
- nodes = new HashMap<DatanodeDescriptor, Reason>();
+ nodes = new HashMap<>();
corruptReplicasMap.put(blk, nodes);
}
-
+
String reasonText;
if (reason != null) {
reasonText = " because " + reason;
} else {
reasonText = "";
}
-
+
if (!nodes.keySet().contains(dn)) {
NameNode.blockStateChangeLog.info(
"BLOCK NameSystem.addToCorruptReplicasMap: {} added as corrupt on "
@@ -92,7 +96,7 @@ public class CorruptReplicasMap{
*
* @param blk Block to be removed
*/
- void removeFromCorruptReplicasMap(Block blk) {
+ void removeFromCorruptReplicasMap(BlockInfo blk) {
if (corruptReplicasMap != null) {
corruptReplicasMap.remove(blk);
}
@@ -105,12 +109,13 @@ public class CorruptReplicasMap{
* @return true if the removal is successful;
false if the replica is not in the map
*/
- boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
+ boolean removeFromCorruptReplicasMap(BlockInfo blk,
+ DatanodeDescriptor datanode) {
return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
}
- boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
- Reason reason) {
+ boolean removeFromCorruptReplicasMap(BlockInfo blk,
+ DatanodeDescriptor datanode, Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
if (datanodes==null)
return false;
@@ -139,11 +144,9 @@ public class CorruptReplicasMap{
* @param blk Block for which nodes are requested
* @return collection of nodes. Null if does not exists
*/
- Collection<DatanodeDescriptor> getNodes(Block blk) {
- Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
- if (nodes == null)
- return null;
- return nodes.keySet();
+ Collection<DatanodeDescriptor> getNodes(BlockInfo blk) {
+ Map<DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
+ return nodes != null ? nodes.keySet() : null;
}
/**
@@ -153,12 +156,12 @@ public class CorruptReplicasMap{
* @param node DatanodeDescriptor which holds the replica
* @return true if replica is corrupt, false if does not exists in this map
*/
- boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
+ boolean isReplicaCorrupt(BlockInfo blk, DatanodeDescriptor node) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return ((nodes != null) && (nodes.contains(node)));
}
- int numCorruptReplicas(Block blk) {
+ int numCorruptReplicas(BlockInfo blk) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return (nodes == null) ? 0 : nodes.size();
}
@@ -168,9 +171,9 @@ public class CorruptReplicasMap{
}
/**
- * Return a range of corrupt replica block ids. Up to numExpectedBlocks
+ * Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned
- * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
+ * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
* is null, up to numExpectedBlocks blocks are returned from the beginning.
* If startingBlockId cannot be found, null is returned.
*
@@ -181,44 +184,39 @@ public class CorruptReplicasMap{
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
*
*/
+ @VisibleForTesting
long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
Long startingBlockId) {
if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
return null;
}
-
- Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
-
+ Iterator<BlockInfo> blockIt = corruptReplicasMap.keySet().iterator();
// if the starting block id was specified, iterate over keys until
// we find the matching block. If we find a matching block, break
- // to leave the iterator on the next block after the specified block.
+ // to leave the iterator on the next block after the specified block.
if (startingBlockId != null) {
boolean isBlockFound = false;
while (blockIt.hasNext()) {
- Block b = blockIt.next();
+ BlockInfo b = blockIt.next();
if (b.getBlockId() == startingBlockId) {
isBlockFound = true;
- break;
+ break;
}
}
-
if (!isBlockFound) {
return null;
}
}
- ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
-
+ ArrayList<Long> corruptReplicaBlockIds = new ArrayList<>();
// append up to numExpectedBlocks blockIds to our list
for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
corruptReplicaBlockIds.add(blockIt.next().getBlockId());
}
-
long[] ret = new long[corruptReplicaBlockIds.size()];
for(int i=0; i<ret.length; i++) {
ret[i] = corruptReplicaBlockIds.get(i);
}
-
return ret;
}
@@ -229,7 +227,7 @@ public class CorruptReplicasMap{
* @param node datanode that contains this corrupted replica
* @return reason
*/
- String getCorruptReason(Block block, DatanodeDescriptor node) {
+ String getCorruptReason(BlockInfo block, DatanodeDescriptor node) {
Reason reason = null;
if(corruptReplicasMap.containsKey(block)) {
if (corruptReplicasMap.get(block).containsKey(node)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 4830d5d..eebeac0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -71,7 +71,7 @@ class FSDirWriteFileOp {
private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock(
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
- Block block) throws IOException {
+ BlockInfo block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
@@ -136,7 +136,9 @@ class FSDirWriteFileOp {
fsd.writeLock();
try {
// Remove the block from the pending creates list
- if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
+ BlockInfo storedBlock = fsd.getBlockManager().getStoredBlock(localBlock);
+ if (storedBlock != null &&
+ !unprotectedRemoveBlock(fsd, src, iip, file, storedBlock)) {
return;
}
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 63ef985..96d6982 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -1035,7 +1035,7 @@ public class FSEditLogLoader {
throw new IOException("Trying to remove more than one block from file "
+ path);
}
- Block oldBlock = oldBlocks[oldBlocks.length - 1];
+ BlockInfo oldBlock = oldBlocks[oldBlocks.length - 1];
boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
fsDir, path, iip, file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index ab179b4..2a8231a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -267,10 +267,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println("No. of corrupted Replica: " +
numberReplicas.corruptReplicas());
//record datanodes that have corrupted block replica
- Collection<DatanodeDescriptor> corruptionRecord = null;
- if (bm.getCorruptReplicas(block) != null) {
- corruptionRecord = bm.getCorruptReplicas(block);
- }
+ Collection<DatanodeDescriptor> corruptionRecord =
+ bm.getCorruptReplicas(blockInfo);
//report block replicas status on datanodes
for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
@@ -279,7 +277,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
dn.getNetworkLocation() + " ");
if (corruptionRecord != null && corruptionRecord.contains(dn)) {
out.print(CORRUPT_STATUS+"\t ReasonCode: "+
- bm.getCorruptReason(block,dn));
+ bm.getCorruptReason(blockInfo, dn));
} else if (dn.isDecommissioned() ){
out.print(DECOMMISSIONED_STATUS);
} else if (dn.isDecommissionInProgress()) {
@@ -650,7 +648,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
LightWeightLinkedSet<BlockInfo> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
- bm.getCorruptReplicas(block.getLocalBlock());
+ bm.getCorruptReplicas(storedBlock);
sb.append("(");
if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)");
@@ -658,7 +656,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
sb.append("DECOMMISSIONING)");
} else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
sb.append("CORRUPT)");
- } else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
+ } else if (blocksExcess != null && blocksExcess.contains(storedBlock)) {
sb.append("EXCESS)");
} else if (dnDesc.isStale(this.staleInterval)) {
sb.append("STALE_NODE)");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 89ee674..af1e023 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -560,7 +560,8 @@ public class DFSTestUtil {
throws TimeoutException, InterruptedException {
int count = 0;
final int ATTEMPTS = 50;
- int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
+ int repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(),
+ b.getLocalBlock());
while (repls != corruptRepls && count < ATTEMPTS) {
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@@ -572,7 +573,8 @@ public class DFSTestUtil {
count++;
// check more often so corrupt block reports are not easily missed
for (int i = 0; i < 10; i++) {
- repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
+ repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(),
+ b.getLocalBlock());
Thread.sleep(100);
if (repls == corruptRepls) {
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 148135b..a899891 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -87,7 +88,7 @@ public class BlockManagerTestUtil {
final Block b) {
final Set<String> rackSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes =
- getCorruptReplicas(blockManager).getNodes(b);
+ getCorruptReplicas(blockManager).getNodes(blockManager.getStoredBlock(b));
for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@@ -306,4 +307,8 @@ public class BlockManagerTestUtil {
throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor();
}
+
+ public static int numCorruptReplicas(BlockManager bm, Block block) {
+ return bm.corruptReplicas.numCorruptReplicas(bm.getStoredBlock(block));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index bae4f1d..c23f3d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -63,9 +63,7 @@ public class TestBlockInfo {
final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
- boolean added = blockInfo.addStorage(storage, blockInfo);
-
- Assert.assertTrue(added);
+ blockInfo.addStorage(storage, blockInfo);
Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
}
@@ -73,7 +71,7 @@ public class TestBlockInfo {
public void testCopyConstructor() {
BlockInfo old = new BlockInfoContiguous((short) 3);
try {
- BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old);
+ BlockInfo copy = new BlockInfoContiguous(old);
assertEquals(old.getBlockCollection(), copy.getBlockCollection());
assertEquals(old.getCapacity(), copy.getCapacity());
} catch (Exception e) {
@@ -110,8 +108,8 @@ public class TestBlockInfo {
final int MAX_BLOCKS = 10;
DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
- ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
- ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
+ ArrayList<Block> blockList = new ArrayList<>(MAX_BLOCKS);
+ ArrayList<BlockInfo> blockInfoList = new ArrayList<>();
int headIndex;
int curIndex;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 9e31670..f6cc747 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -509,7 +509,7 @@ public class TestBlockManager {
+ " even if all available source nodes have reached their replication"
+ " limits below the hard limit.",
bm.chooseSourceDatanode(
- aBlock,
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@@ -519,7 +519,7 @@ public class TestBlockManager {
+ " replication since all available source nodes have reached"
+ " their replication limits.",
bm.chooseSourceDatanode(
- aBlock,
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@@ -532,7 +532,7 @@ public class TestBlockManager {
assertNull("Does not choose a source node for a highest-priority"
+ " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode(
- aBlock,
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@@ -558,7 +558,7 @@ public class TestBlockManager {
+ " if all available source nodes have reached their replication"
+ " limits below the hard limit.",
bm.chooseSourceDatanode(
- aBlock,
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@@ -572,7 +572,7 @@ public class TestBlockManager {
assertNull("Does not choose a source decommissioning node for a normal"
+ " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode(
- aBlock,
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
index 21fb54e..1a49bee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
@@ -48,20 +48,19 @@ public class TestCorruptReplicaInfo {
private static final Log LOG =
LogFactory.getLog(TestCorruptReplicaInfo.class);
- private final Map<Long, Block> block_map =
- new HashMap<Long, Block>();
+ private final Map<Long, BlockInfo> block_map = new HashMap<>();
// Allow easy block creation by block id
// Return existing block if one with same block id already exists
- private Block getBlock(Long block_id) {
+ private BlockInfo getBlock(Long block_id) {
if (!block_map.containsKey(block_id)) {
- block_map.put(block_id, new Block(block_id,0,0));
+ block_map.put(block_id,
+ new BlockInfoContiguous(new Block(block_id, 0, 0), (short) 1));
}
-
return block_map.get(block_id);
}
- private Block getBlock(int block_id) {
+ private BlockInfo getBlock(int block_id) {
return getBlock((long)block_id);
}
@@ -82,7 +81,7 @@ public class TestCorruptReplicaInfo {
// create a list of block_ids. A list is used to allow easy validation of the
// output of getCorruptReplicaBlockIds
int NUM_BLOCK_IDS = 140;
- List<Long> block_ids = new LinkedList<Long>();
+ List<Long> block_ids = new LinkedList<>();
for (int i=0;i<NUM_BLOCK_IDS;i++) {
block_ids.add((long)i);
}
@@ -130,7 +129,7 @@ public class TestCorruptReplicaInfo {
}
private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
- Block blk, DatanodeDescriptor dn) {
+ BlockInfo blk, DatanodeDescriptor dn) {
crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE);
}
}
[12/24] hadoop git commit: MAPREDUCE-6425. ShuffleHandler passes
wrong "base" parameter to getMapOutputInfo if mapId is not in the cache.
Contributed by zhihai xu.
Posted by ar...@apache.org.
MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to
getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff67dfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff67dfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff67dfe
Branch: refs/heads/HDFS-7240
Commit: bff67dfe2f811654ffb1bbcbd87509c185f452b6
Parents: 688617d
Author: Devaraj K <de...@apache.org>
Authored: Mon Jul 6 13:46:37 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Mon Jul 6 13:46:37 2015 +0530
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../apache/hadoop/mapred/ShuffleHandler.java | 3 +-
.../hadoop/mapred/TestShuffleHandler.java | 101 +++++++++++++++++++
3 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2f80615..2458403 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -542,6 +542,9 @@ Release 2.7.2 - UNRELEASED
BUG FIXES
+ MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
+ if mapId is not in the cache. (zhihai xu via devaraj)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index eedf42b..ee1be23 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -815,7 +815,8 @@ public class ShuffleHandler extends AuxiliaryService {
try {
MapOutputInfo info = mapOutputInfoMap.get(mapId);
if (info == null) {
- info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+ info = getMapOutputInfo(outputBasePathStr + mapId,
+ mapId, reduceId, user);
}
lastMap =
sendMapOutput(ctx, ch, user, mapId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 7053653..746071f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -601,6 +601,7 @@ public class TestShuffleHandler {
Assert.assertTrue((new String(byteArr)).contains(message));
} finally {
shuffleHandler.stop();
+ FileUtil.fullyDelete(absLogDir);
}
}
@@ -829,4 +830,104 @@ public class TestShuffleHandler {
conn.disconnect();
return rc;
}
+
+ @Test(timeout = 100000)
+ public void testGetMapOutputInfo() throws Exception {
+ final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ File absLogDir = new File("target", TestShuffleHandler.class.
+ getSimpleName() + "LocDir").getAbsoluteFile();
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+ ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ String appAttemptId = "attempt_12345_1_m_1_0";
+ String user = "randomUser";
+ String reducerId = "0";
+ List<File> fileMap = new ArrayList<File>();
+ createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+ conf, fileMap);
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
+ @Override
+ protected Shuffle getShuffle(Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+ @Override
+ protected void populateHeaders(List<String> mapIds,
+ String outputBaseStr, String user, int reduce,
+ HttpRequest request, HttpResponse response,
+ boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
+ throws IOException {
+ // Only set response headers and skip everything else
+ // send some dummy value for content-length
+ super.setResponseHeaders(response, keepAliveParam, 100);
+ }
+ @Override
+ protected void verifyRequest(String appid,
+ ChannelHandlerContext ctx, HttpRequest request,
+ HttpResponse response, URL requestUri) throws IOException {
+ // Do nothing.
+ }
+ @Override
+ protected void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ if (failures.size() == 0) {
+ failures.add(new Error(message));
+ ctx.getChannel().close();
+ }
+ }
+ @Override
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+ Channel ch, String user, String mapId, int reduce,
+ MapOutputInfo info) throws IOException {
+ // send a shuffle header
+ ShuffleHeader header =
+ new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ }
+ };
+ }
+ };
+ shuffleHandler.init(conf);
+ try {
+ shuffleHandler.start();
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt =
+ new Token<JobTokenIdentifier>("identifier".getBytes(),
+ "password".getBytes(), new Text(user), new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffleHandler
+ .initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+ URL url =
+ new URL(
+ "http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ + "&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ try {
+ DataInputStream is = new DataInputStream(conn.getInputStream());
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(is);
+ is.close();
+ } catch (EOFException e) {
+ // ignore
+ }
+ Assert.assertEquals(failures.size(), 0);
+ } finally {
+ shuffleHandler.stop();
+ FileUtil.fullyDelete(absLogDir);
+ }
+ }
}
[05/24] hadoop git commit: HADOOP-12172. FsShell mkdir -p makes an
unnecessary check for the existence of the parent. Contributed by Chris
Nauroth.
Posted by ar...@apache.org.
HADOOP-12172. FsShell mkdir -p makes an unnecessary check for the existence of the parent. Contributed by Chris Nauroth.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f3796224
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f3796224
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f3796224
Branch: refs/heads/HDFS-7240
Commit: f3796224bfdfd88e2428cc8a9915bdfdc62b48f3
Parents: a78d507
Author: cnauroth <cn...@apache.org>
Authored: Wed Jul 1 19:47:58 2015 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Wed Jul 1 19:47:58 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++
.../src/main/java/org/apache/hadoop/fs/shell/Mkdir.java | 3 ++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3796224/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 24431ba..312a996 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -704,6 +704,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12112. Make hadoop-common-project Native code -Wall-clean
(alanburlison via cmccabe)
+ HADOOP-12172. FsShell mkdir -p makes an unnecessary check for the existence
+ of the parent. (cnauroth)
+
BUG FIXES
HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3796224/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
index 74bad62..9f39da2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
@@ -70,7 +70,8 @@ class Mkdir extends FsCommand {
protected void processNonexistentPath(PathData item) throws IOException {
// check if parent exists. this is complicated because getParent(a/b/c/) returns a/b/c, but
// we want a/b
- if (!item.fs.exists(new Path(item.path.toString()).getParent()) && !createParents) {
+ if (!createParents &&
+ !item.fs.exists(new Path(item.path.toString()).getParent())) {
throw new PathNotFoundException(item.toString());
}
if (!item.fs.mkdirs(item.path)) {
[14/24] hadoop git commit: HDFS-8686. WebHdfsFileSystem#getXAttr(Path
p,
final String name) doesn't work if namespace is in capitals (Contributed by
kanaka kumar avvaru)
Posted by ar...@apache.org.
HDFS-8686. WebHdfsFileSystem#getXAttr(Path p, final String name) doesn't work if namespace is in capitals (Contributed by kanaka kumar avvaru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc92d3e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc92d3e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc92d3e6
Branch: refs/heads/HDFS-7240
Commit: fc92d3e6515a391847cb6170244b3d911712d96a
Parents: 233cab8
Author: Vinayakumar B <vi...@apache.org>
Authored: Mon Jul 6 16:09:24 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Mon Jul 6 16:09:24 2015 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/web/JsonUtilClient.java | 14 ++++++++++++++
.../org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java | 2 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++
.../hadoop/hdfs/server/namenode/FSXAttrBaseTest.java | 5 ++++-
.../TestOfflineImageViewerForXAttr.java | 3 +++
5 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
index e025e31..713836c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
@@ -413,6 +413,20 @@ class JsonUtilClient {
return null;
}
+ /** Expecting only single XAttr in the map. return its value */
+ static byte[] getXAttr(final Map<?, ?> json) throws IOException {
+ if (json == null) {
+ return null;
+ }
+
+ Map<String, byte[]> xAttrs = toXAttrs(json);
+ if (xAttrs != null && !xAttrs.values().isEmpty()) {
+ return xAttrs.values().iterator().next();
+ }
+
+ return null;
+ }
+
static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
throws IOException {
if (json == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 2650dca..b661d07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -963,7 +963,7 @@ public class WebHdfsFileSystem extends FileSystem
new XAttrEncodingParam(XAttrCodec.HEX)) {
@Override
byte[] decodeResponse(Map<?, ?> json) throws IOException {
- return JsonUtilClient.getXAttr(json, name);
+ return JsonUtilClient.getXAttr(json);
}
}.run();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4f184fb..9edc2af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1005,6 +1005,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8577. Avoid retrying to recover lease on a file which does not exist
(J.Andreina via vinayakumarb)
+ HDFS-8686. WebHdfsFileSystem#getXAttr(Path p, final String name) doesn't
+ work if namespace is in capitals (kanaka kumar avvaru via vinayakumarb)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
index e21e34c..eb9053c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
@@ -395,7 +395,10 @@ public class FSXAttrBaseTest {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
-
+
+ final byte[] theValue = fs.getXAttr(path, "USER.a2");
+ Assert.assertArrayEquals(value2, theValue);
+
/* An XAttr that was requested does not exist. */
try {
final byte[] value = fs.getXAttr(path, name3);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
index 3f23f64..6c82101 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
@@ -231,6 +231,9 @@ public class TestOfflineImageViewerForXAttr {
"user.attr1"));
assertEquals("value1", value);
+ value = new String(webhdfs.getXAttr(new Path("/dir1"), "USER.attr1"));
+ assertEquals("value1", value);
+
Map<String, byte[]> contentMap = webhdfs.getXAttrs(new Path("/dir1"),
names);
[15/24] hadoop git commit: HADOOP-12045. Enable
LocalFileSystem#setTimes to change atime. Contributed by Kazuho Fujii.
Posted by ar...@apache.org.
HADOOP-12045. Enable LocalFileSystem#setTimes to change atime. Contributed by Kazuho Fujii.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed1e3ce4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed1e3ce4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed1e3ce4
Branch: refs/heads/HDFS-7240
Commit: ed1e3ce482f679ae2fad43a203f6578d7af59327
Parents: fc92d3e
Author: cnauroth <cn...@apache.org>
Authored: Mon Jul 6 13:40:15 2015 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Mon Jul 6 13:40:15 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../apache/hadoop/fs/RawLocalFileSystem.java | 36 ++++++-----
.../org/apache/hadoop/fs/SymlinkBaseTest.java | 45 +++++++++++---
.../apache/hadoop/fs/TestLocalFileSystem.java | 26 ++++++--
.../apache/hadoop/fs/TestSymlinkLocalFS.java | 18 ++++++
.../hadoop/fs/shell/TestCopyPreserveFlag.java | 63 ++++++++++----------
6 files changed, 132 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1d737e5..f2f9d5c 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -675,6 +675,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)
+ HADOOP-12045. Enable LocalFileSystem#setTimes to change atime.
+ (Kazuho Fujii via cnauroth)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index 96d1ab4..ac65b62 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -33,6 +33,10 @@ import java.io.OutputStream;
import java.io.FileDescriptor;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.BasicFileAttributeView;
+import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.StringTokenizer;
@@ -644,9 +648,14 @@ public class RawLocalFileSystem extends FileSystem {
return !super.getOwner().isEmpty();
}
- DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
+ DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)
+ throws IOException {
super(f.length(), f.isDirectory(), 1, defaultBlockSize,
- f.lastModified(), new Path(f.getPath()).makeQualified(fs.getUri(),
+ f.lastModified(),
+ Files.readAttributes(f.toPath(),
+ BasicFileAttributes.class).lastAccessTime().toMillis(),
+ null, null, null,
+ new Path(f.getPath()).makeQualified(fs.getUri(),
fs.getWorkingDirectory()));
}
@@ -758,25 +767,20 @@ public class RawLocalFileSystem extends FileSystem {
}
/**
- * Sets the {@link Path}'s last modified time <em>only</em> to the given
- * valid time.
+ * Sets the {@link Path}'s last modified time and last access time to
+ * the given valid times.
*
* @param mtime the modification time to set (only if greater than zero).
- * @param atime currently ignored.
- * @throws IOException if setting the last modified time fails.
+ * @param atime the access time to set (only if greater than zero).
+ * @throws IOException if setting the times fails.
*/
@Override
public void setTimes(Path p, long mtime, long atime) throws IOException {
- File f = pathToFile(p);
- if(mtime >= 0) {
- if(!f.setLastModified(mtime)) {
- throw new IOException(
- "couldn't set last-modified time to " +
- mtime +
- " for " +
- f.getAbsolutePath());
- }
- }
+ BasicFileAttributeView view = Files.getFileAttributeView(
+ pathToFile(p).toPath(), BasicFileAttributeView.class);
+ FileTime fmtime = (mtime >= 0) ? FileTime.fromMillis(mtime) : null;
+ FileTime fatime = (atime >= 0) ? FileTime.fromMillis(atime) : null;
+ view.setTimes(fmtime, fatime, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
index 4d6485d..8018946 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
@@ -1386,19 +1386,48 @@ public abstract class SymlinkBaseTest {
}
@Test(timeout=10000)
- /** setTimes affects the target not the link */
- public void testSetTimes() throws IOException {
+ /** setTimes affects the target file not the link */
+ public void testSetTimesSymlinkToFile() throws IOException {
Path file = new Path(testBaseDir1(), "file");
Path link = new Path(testBaseDir1(), "linkToFile");
createAndWriteFile(file);
wrapper.createSymlink(file, link, false);
long at = wrapper.getFileLinkStatus(link).getAccessTime();
- wrapper.setTimes(link, 2L, 3L);
- // NB: local file systems don't implement setTimes
- if (!"file".equals(getScheme())) {
- assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
- assertEquals(3, wrapper.getFileStatus(file).getAccessTime());
- assertEquals(2, wrapper.getFileStatus(file).getModificationTime());
+ // the local file system may not support millisecond timestamps
+ wrapper.setTimes(link, 2000L, 3000L);
+ assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
+ assertEquals(2000, wrapper.getFileStatus(file).getModificationTime());
+ assertEquals(3000, wrapper.getFileStatus(file).getAccessTime());
+ }
+
+ @Test(timeout=10000)
+ /** setTimes affects the target directory not the link */
+ public void testSetTimesSymlinkToDir() throws IOException {
+ Path dir = new Path(testBaseDir1(), "dir");
+ Path link = new Path(testBaseDir1(), "linkToDir");
+ wrapper.mkdir(dir, FileContext.DEFAULT_PERM, false);
+ wrapper.createSymlink(dir, link, false);
+ long at = wrapper.getFileLinkStatus(link).getAccessTime();
+ // the local file system may not support millisecond timestamps
+ wrapper.setTimes(link, 2000L, 3000L);
+ assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
+ assertEquals(2000, wrapper.getFileStatus(dir).getModificationTime());
+ assertEquals(3000, wrapper.getFileStatus(dir).getAccessTime());
+ }
+
+ @Test(timeout=10000)
+ /** setTimes does not affect the link even though target does not exist */
+ public void testSetTimesDanglingLink() throws IOException {
+ Path file = new Path("/noSuchFile");
+ Path link = new Path(testBaseDir1()+"/link");
+ wrapper.createSymlink(file, link, false);
+ long at = wrapper.getFileLinkStatus(link).getAccessTime();
+ try {
+ wrapper.setTimes(link, 2000L, 3000L);
+ fail("set times to non-existant file");
+ } catch (IOException e) {
+ // Expected
}
+ assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index df5cba9..f641f04 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -378,7 +378,14 @@ public class TestLocalFileSystem {
assertTrue(dataFileFound);
assertTrue(checksumFileFound);
}
-
+
+ private void checkTimesStatus(Path path,
+ long expectedModTime, long expectedAccTime) throws IOException {
+ FileStatus status = fileSys.getFileStatus(path);
+ assertEquals(expectedModTime, status.getModificationTime());
+ assertEquals(expectedAccTime, status.getAccessTime());
+ }
+
@Test(timeout = 1000)
public void testSetTimes() throws Exception {
Path path = new Path(TEST_ROOT_DIR, "set-times");
@@ -387,15 +394,24 @@ public class TestLocalFileSystem {
// test only to the nearest second, as the raw FS may not
// support millisecond timestamps
long newModTime = 12345000;
+ long newAccTime = 23456000;
FileStatus status = fileSys.getFileStatus(path);
assertTrue("check we're actually changing something", newModTime != status.getModificationTime());
- long accessTime = status.getAccessTime();
+ assertTrue("check we're actually changing something", newAccTime != status.getAccessTime());
+
+ fileSys.setTimes(path, newModTime, newAccTime);
+ checkTimesStatus(path, newModTime, newAccTime);
+
+ newModTime = 34567000;
fileSys.setTimes(path, newModTime, -1);
- status = fileSys.getFileStatus(path);
- assertEquals(newModTime, status.getModificationTime());
- assertEquals(accessTime, status.getAccessTime());
+ checkTimesStatus(path, newModTime, newAccTime);
+
+ newAccTime = 45678000;
+
+ fileSys.setTimes(path, -1, newAccTime);
+ checkTimesStatus(path, newModTime, newAccTime);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
index 64e34af..602af97 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
@@ -231,4 +231,22 @@ abstract public class TestSymlinkLocalFS extends SymlinkBaseTest {
// Expected.
}
}
+
+ @Override
+ public void testSetTimesSymlinkToFile() throws IOException {
+ assumeTrue(!Path.WINDOWS);
+ super.testSetTimesSymlinkToFile();
+ }
+
+ @Override
+ public void testSetTimesSymlinkToDir() throws IOException {
+ assumeTrue(!Path.WINDOWS);
+ super.testSetTimesSymlinkToDir();
+ }
+
+ @Override
+ public void testSetTimesDanglingLink() throws IOException {
+ assumeTrue(!Path.WINDOWS);
+ super.testSetTimesDanglingLink();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
index ecfb5a5..263c697 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
@@ -18,12 +18,13 @@
package org.apache.hadoop.fs.shell;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -38,8 +39,12 @@ import org.junit.Test;
public class TestCopyPreserveFlag {
private static final int MODIFICATION_TIME = 12345000;
- private static final Path FROM = new Path("d1", "f1");
- private static final Path TO = new Path("d2", "f2");
+ private static final int ACCESS_TIME = 23456000;
+ private static final Path DIR_FROM = new Path("d0");
+ private static final Path DIR_TO1 = new Path("d1");
+ private static final Path DIR_TO2 = new Path("d2");
+ private static final Path FROM = new Path(DIR_FROM, "f0");
+ private static final Path TO = new Path(DIR_TO1, "f1");
private static final FsPermission PERMISSIONS = new FsPermission(
FsAction.ALL,
FsAction.EXECUTE,
@@ -62,8 +67,8 @@ public class TestCopyPreserveFlag {
FileSystem.setDefaultUri(conf, fs.getUri());
fs.setWorkingDirectory(testDir);
- fs.mkdirs(new Path("d1"));
- fs.mkdirs(new Path("d2"));
+ fs.mkdirs(DIR_FROM);
+ fs.mkdirs(DIR_TO1);
fs.createNewFile(FROM);
FSDataOutputStream output = fs.create(FROM, true);
@@ -72,10 +77,10 @@ public class TestCopyPreserveFlag {
output.writeChar('\n');
}
output.close();
- fs.setTimes(FROM, MODIFICATION_TIME, 0);
+ fs.setTimes(FROM, MODIFICATION_TIME, ACCESS_TIME);
fs.setPermission(FROM, PERMISSIONS);
- fs.setTimes(new Path("d1"), MODIFICATION_TIME, 0);
- fs.setPermission(new Path("d1"), PERMISSIONS);
+ fs.setTimes(DIR_FROM, MODIFICATION_TIME, ACCESS_TIME);
+ fs.setPermission(DIR_FROM, PERMISSIONS);
}
@After
@@ -84,14 +89,18 @@ public class TestCopyPreserveFlag {
fs.close();
}
- private void assertAttributesPreserved() throws IOException {
- assertEquals(MODIFICATION_TIME, fs.getFileStatus(TO).getModificationTime());
- assertEquals(PERMISSIONS, fs.getFileStatus(TO).getPermission());
+ private void assertAttributesPreserved(Path to) throws IOException {
+ FileStatus status = fs.getFileStatus(to);
+ assertEquals(MODIFICATION_TIME, status.getModificationTime());
+ assertEquals(ACCESS_TIME, status.getAccessTime());
+ assertEquals(PERMISSIONS, status.getPermission());
}
- private void assertAttributesChanged() throws IOException {
- assertTrue(MODIFICATION_TIME != fs.getFileStatus(TO).getModificationTime());
- assertTrue(!PERMISSIONS.equals(fs.getFileStatus(TO).getPermission()));
+ private void assertAttributesChanged(Path to) throws IOException {
+ FileStatus status = fs.getFileStatus(to);
+ assertNotEquals(MODIFICATION_TIME, status.getModificationTime());
+ assertNotEquals(ACCESS_TIME, status.getAccessTime());
+ assertNotEquals(PERMISSIONS, status.getPermission());
}
private void run(CommandWithDestination cmd, String... args) {
@@ -102,54 +111,48 @@ public class TestCopyPreserveFlag {
@Test(timeout = 10000)
public void testPutWithP() throws Exception {
run(new Put(), "-p", FROM.toString(), TO.toString());
- assertAttributesPreserved();
+ assertAttributesPreserved(TO);
}
@Test(timeout = 10000)
public void testPutWithoutP() throws Exception {
run(new Put(), FROM.toString(), TO.toString());
- assertAttributesChanged();
+ assertAttributesChanged(TO);
}
@Test(timeout = 10000)
public void testGetWithP() throws Exception {
run(new Get(), "-p", FROM.toString(), TO.toString());
- assertAttributesPreserved();
+ assertAttributesPreserved(TO);
}
@Test(timeout = 10000)
public void testGetWithoutP() throws Exception {
run(new Get(), FROM.toString(), TO.toString());
- assertAttributesChanged();
+ assertAttributesChanged(TO);
}
@Test(timeout = 10000)
public void testCpWithP() throws Exception {
run(new Cp(), "-p", FROM.toString(), TO.toString());
- assertAttributesPreserved();
+ assertAttributesPreserved(TO);
}
@Test(timeout = 10000)
public void testCpWithoutP() throws Exception {
run(new Cp(), FROM.toString(), TO.toString());
- assertAttributesChanged();
+ assertAttributesChanged(TO);
}
@Test(timeout = 10000)
public void testDirectoryCpWithP() throws Exception {
- run(new Cp(), "-p", "d1", "d3");
- assertEquals(fs.getFileStatus(new Path("d1")).getModificationTime(),
- fs.getFileStatus(new Path("d3")).getModificationTime());
- assertEquals(fs.getFileStatus(new Path("d1")).getPermission(),
- fs.getFileStatus(new Path("d3")).getPermission());
+ run(new Cp(), "-p", DIR_FROM.toString(), DIR_TO2.toString());
+ assertAttributesPreserved(DIR_TO2);
}
@Test(timeout = 10000)
public void testDirectoryCpWithoutP() throws Exception {
- run(new Cp(), "d1", "d4");
- assertTrue(fs.getFileStatus(new Path("d1")).getModificationTime() !=
- fs.getFileStatus(new Path("d4")).getModificationTime());
- assertTrue(!fs.getFileStatus(new Path("d1")).getPermission()
- .equals(fs.getFileStatus(new Path("d4")).getPermission()));
+ run(new Cp(), DIR_FROM.toString(), DIR_TO2.toString());
+ assertAttributesChanged(DIR_TO2);
}
}
[08/24] hadoop git commit: HDFS-8709. Clarify automatic sync in
FSEditLog#logEdit.
Posted by ar...@apache.org.
HDFS-8709. Clarify automatic sync in FSEditLog#logEdit.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5fddc517
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5fddc517
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5fddc517
Branch: refs/heads/HDFS-7240
Commit: 5fddc5177ddad07a735d49c15a63cfc5f74d0891
Parents: bff5999
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Jul 2 10:26:40 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Jul 2 10:26:40 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++
.../hadoop/hdfs/server/namenode/FSEditLog.java | 28 +++++++++++++-------
2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fddc517/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7b96c56..6678a3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -699,6 +699,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
(vinayakumarb)
+ HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fddc517/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 1b0b572..939e841 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -409,10 +409,14 @@ public class FSEditLog implements LogsPurgeable {
}
/**
- * Write an operation to the edit log. Do not sync to persistent
- * store yet.
+ * Write an operation to the edit log.
+ * <p/>
+ * Additionally, this will sync the edit log if required by the underlying
+ * edit stream's automatic sync policy (e.g. when the buffer is full, or
+ * if a time interval has elapsed).
*/
void logEdit(final FSEditLogOp op) {
+ boolean needsSync = false;
synchronized (this) {
assert isOpenForWrite() :
"bad state: " + state;
@@ -434,14 +438,16 @@ public class FSEditLog implements LogsPurgeable {
endTransaction(start);
// check if it is time to schedule an automatic sync
- if (!shouldForceSync()) {
- return;
+ needsSync = shouldForceSync();
+ if (needsSync) {
+ isAutoSyncScheduled = true;
}
- isAutoSyncScheduled = true;
}
- // sync buffered edit log entries to persistent store
- logSync();
+ // Sync the log if an automatic sync is required.
+ if (needsSync) {
+ logSync();
+ }
}
/**
@@ -1191,11 +1197,13 @@ public class FSEditLog implements LogsPurgeable {
throws IOException {
return journalSet.getEditLogManifest(fromTxId);
}
-
+
/**
* Finalizes the current edit log and opens a new log segment.
- * @return the transaction id of the BEGIN_LOG_SEGMENT transaction
- * in the new log.
+ *
+ * @param layoutVersion The layout version of the new edit log segment.
+ * @return the transaction id of the BEGIN_LOG_SEGMENT transaction in the new
+ * log.
*/
synchronized long rollEditLog(int layoutVersion) throws IOException {
LOG.info("Rolling edit logs");