You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/07 18:04:08 UTC

[01/24] hadoop git commit: YARN-3793. Several NPEs when deleting local files on NM recovery. Contributed by Varun Saxena

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 93cebb2e2 -> 83f39a32c


YARN-3793. Several NPEs when deleting local files on NM recovery. Contributed by Varun Saxena


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5cdf78e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5cdf78e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5cdf78e

Branch: refs/heads/HDFS-7240
Commit: b5cdf78e8e6cd6c5c1fb7286207dac72be32c0d6
Parents: eac1d18
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jul 1 21:13:32 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jul 1 21:13:32 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../logaggregation/AppLogAggregatorImpl.java    |  6 ++--
 .../TestLogAggregationService.java              | 36 ++++++++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5cdf78e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4389e27..3620a71 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -598,6 +598,9 @@ Release 2.7.2 - UNRELEASED
 
   BUG FIXES
 
+    YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
+    Saxena via jlowe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5cdf78e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 4b95a03..654eb0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -276,10 +276,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             aggregator.doContainerLogAggregation(writer, appFinished);
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
+          this.delService.delete(this.userUgi.getShortUserName(), null,
+              uploadedFilePathsInThisCycle
+                  .toArray(new Path[uploadedFilePathsInThisCycle.size()]));
         }
-        this.delService.delete(this.userUgi.getShortUserName(), null,
-          uploadedFilePathsInThisCycle
-            .toArray(new Path[uploadedFilePathsInThisCycle.size()]));
 
         // This container is finished, and all its logs have been uploaded,
         // remove it from containerLogAggregators.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5cdf78e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index fd97cef..6a3d270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -280,6 +281,41 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     verifyLocalFileDeletion(logAggregationService);
   }
 
+  /* Test to verify fix for YARN-3793 */
+  @Test
+  public void testNoLogsUploadedOnAppFinish() throws Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+
+    LogAggregationService logAggregationService = new LogAggregationService(
+        dispatcher, this.context, this.delSrvc, super.dirsHandler);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId app = BuilderUtils.newApplicationId(1234, 1);
+    File appLogDir = new File(localLogDir, ConverterUtils.toString(app));
+    appLogDir.mkdir();
+    LogAggregationContext context =
+        LogAggregationContext.newInstance("HOST*", "sys*");
+    logAggregationService.handle(new LogHandlerAppStartedEvent(app, this.user,
+        null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, context));
+
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(app, 1);
+    ContainerId cont = BuilderUtils.newContainerId(appAttemptId, 1);
+    writeContainerLogs(appLogDir, cont, new String[] { "stdout",
+        "stderr", "syslog" });
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0));
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(app));
+    logAggregationService.stop();
+    delSrvc.stop();
+    // Aggregated logs should not be deleted if not uploaded.
+    verify(delSrvc, times(0)).delete(user, null);
+  }
 
   @Test
   public void testNoContainerOnNode() throws Exception {


[06/24] hadoop git commit: YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id properly in log. Contributed by Bibin A Chundatt.

Posted by ar...@apache.org.
YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
properly in log. Contributed by Bibin A Chundatt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37d73957
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37d73957
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37d73957

Branch: refs/heads/HDFS-7240
Commit: 37d7395773b5bb24aa522db38a2602df9a5ac184
Parents: f379622
Author: Devaraj K <de...@apache.org>
Authored: Thu Jul 2 10:20:31 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Thu Jul 2 10:20:31 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                          |  3 +++
 .../resourcemanager/scheduler/fair/FSSchedulerNode.java  | 11 ++++++-----
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/37d73957/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6839548..2009b47 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -586,6 +586,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3830. AbstractYarnScheduler.createReleaseCache may try to clean a null
     attempt. (nijel via devaraj)
 
+    YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
+    properly in log. (Bibin A Chundatt via devaraj)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37d73957/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index be08dff..c86201a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -68,12 +68,13 @@ public class FSSchedulerNode extends SchedulerNode {
             " on node " + this);
       }
 
-      LOG.info("Updated reserved container " + 
-          container.getContainer().getId() + " on node " + 
-          this + " for application " + application);
+      LOG.info("Updated reserved container " + container.getContainer().getId()
+          + " on node " + this + " for application "
+          + application.getApplicationId());
     } else {
-      LOG.info("Reserved container " + container.getContainer().getId() + 
-          " on node " + this + " for application " + application);
+      LOG.info("Reserved container " + container.getContainer().getId()
+          + " on node " + this + " for application "
+          + application.getApplicationId());
     }
     setReservedContainer(container);
     this.reservedAppSchedulable = (FSAppAttempt) application;


[20/24] hadoop git commit: YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong prefix for auth options. Contributed by Bibin A Chundatt.

Posted by ar...@apache.org.
YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong
prefix for auth options. Contributed by Bibin A Chundatt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af63427c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af63427c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af63427c

Branch: refs/heads/HDFS-7240
Commit: af63427c6d7d2fc251eafb1f152b7a90c5bd07e5
Parents: 81f3644
Author: Devaraj K <de...@apache.org>
Authored: Tue Jul 7 12:06:30 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Tue Jul 7 12:06:30 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                                 | 3 +++
 .../security/TimelineAuthenticationFilterInitializer.java       | 5 ++---
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/af63427c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f35be75..8f63187 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -592,6 +592,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
     after create them. (zhihai xu via xgong)
 
+    YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong
+    prefix for auth options. (Bibin A Chundatt via devaraj)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af63427c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
index a3c136c..4e7c29a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
@@ -62,9 +62,8 @@ public class TimelineAuthenticationFilterInitializer extends FilterInitializer {
    * Initializes {@link TimelineAuthenticationFilter}
    * <p>
    * Propagates to {@link TimelineAuthenticationFilter} configuration all YARN
-   * configuration properties prefixed with
-   * {@code yarn.timeline-service.authentication.}
-   * 
+   * configuration properties prefixed with {@value #PREFIX}
+   *
    * @param container
    *          The filter container
    * @param conf


[21/24] hadoop git commit: HADOOP-12117. Potential NPE from Configuration#loadProperty with allowNullValueProperties set. (Contributed by zhihai xu)

Posted by ar...@apache.org.
HADOOP-12117. Potential NPE from Configuration#loadProperty with allowNullValueProperties set. (Contributed by zhihai xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/99c8c583
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/99c8c583
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/99c8c583

Branch: refs/heads/HDFS-7240
Commit: 99c8c5839b65666e6099116e4d7024e0eb4682b9
Parents: af63427
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Jul 7 16:11:27 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Tue Jul 7 16:11:27 2015 +0530

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt      |  3 +++
 .../java/org/apache/hadoop/conf/Configuration.java   |  8 ++++----
 .../org/apache/hadoop/conf/TestConfiguration.java    | 15 +++++++++++++++
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c8c583/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index faf5a5c..5d11db9 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -933,6 +933,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
     multiple times (zhihai xu via vinayakumarb)
 
+    HADOOP-12117. Potential NPE from Configuration#loadProperty with
+    allowNullValueProperties set. (zhihai xu via vinayakumarb)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c8c583/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
index 54e07c6..0b45429 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
@@ -2735,14 +2735,14 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       to.put(entry.getKey(), entry.getValue());
     }
   }
-  
+
   private void loadProperty(Properties properties, String name, String attr,
       String value, boolean finalParameter, String[] source) {
     if (value != null || allowNullValueProperties) {
+      if (value == null) {
+        value = DEFAULT_STRING_CHECK;
+      }
       if (!finalParameters.contains(attr)) {
-        if (value==null && allowNullValueProperties) {
-          value = DEFAULT_STRING_CHECK;
-        }
         properties.setProperty(attr, value);
         if(source != null) {
           updatingResource.put(attr, source);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c8c583/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
index ec6c964..a039741 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
@@ -42,6 +42,7 @@ import static java.util.concurrent.TimeUnit.*;
 
 import junit.framework.TestCase;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
+
 import org.codehaus.jackson.map.ObjectMapper;
 import org.mockito.Mockito;
 
@@ -1511,6 +1513,19 @@ public class TestConfiguration extends TestCase {
     // it's expected behaviour.
   }
 
+  public void testNullValueProperties() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setAllowNullValueProperties(true);
+    out = new BufferedWriter(new FileWriter(CONFIG));
+    startConfig();
+    appendProperty("attr", "value", true);
+    appendProperty("attr", "", true);
+    endConfig();
+    Path fileResource = new Path(CONFIG);
+    conf.addResource(fileResource);
+    assertEquals("value", conf.get("attr"));
+  }
+
   public static void main(String[] argv) throws Exception {
     junit.textui.TestRunner.main(new String[]{
       TestConfiguration.class.getName()


[02/24] hadoop git commit: HDFS-8666. Speedup the TestMover test. Contributed by Walter Su.

Posted by ar...@apache.org.
HDFS-8666. Speedup the TestMover test. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/152e5df3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/152e5df3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/152e5df3

Branch: refs/heads/HDFS-7240
Commit: 152e5df3b65394c2939d6de3c4a649a207bb58d3
Parents: b5cdf78
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Jul 1 14:59:50 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Jul 1 14:59:50 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt      |  2 ++
 .../hadoop/hdfs/server/mover/TestMover.java      | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/152e5df3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3736932..ec37542 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -694,6 +694,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8635. Migrate HDFS native build to new CMake framework (Alan Burlison
     via Colin P. McCabe)
 
+    HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/152e5df3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 49e2b23..899b5c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -46,6 +47,21 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class TestMover {
+
+  static final int DEFAULT_BLOCK_SIZE = 100;
+
+  static {
+    TestBalancer.initTestSetup();
+  }
+
+  static void initConf(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+  }
+
   static Mover newMover(Configuration conf) throws IOException {
     final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
     Assert.assertEquals(1, namenodes.size());
@@ -97,6 +113,7 @@ public class TestMover {
   @Test
   public void testScheduleBlockWithinSameNode() throws Exception {
     final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(3)
         .storageTypes(
@@ -285,6 +302,7 @@ public class TestMover {
   public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
     // HDFS-8147
     final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(3)
         .storageTypes(
@@ -361,6 +379,7 @@ public class TestMover {
   public void testMoverFailedRetry() throws Exception {
     // HDFS-8147
     final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
     conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(3)


[19/24] hadoop git commit: HADOOP-11974. Fix FIONREAD #include on Solaris (Alan Burlison via Colin P. McCabe)

Posted by ar...@apache.org.
HADOOP-11974. Fix FIONREAD #include on Solaris (Alan Burlison via Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81f36443
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81f36443
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81f36443

Branch: refs/heads/HDFS-7240
Commit: 81f364437608b21e85fc393f63546bf8b425ac71
Parents: bf89ddb
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Jul 6 12:56:34 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Mon Jul 6 17:17:59 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt             | 3 +++
 .../native/src/org/apache/hadoop/net/unix/DomainSocket.c    | 9 ++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f36443/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index fab78d4..faf5a5c 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -678,6 +678,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12045. Enable LocalFileSystem#setTimes to change atime.
     (Kazuho Fujii via cnauroth)
 
+    HADOOP-11974. Fix FIONREAD #include on Solaris (Alan Burlison via Colin P.
+    McCabe)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f36443/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
index a3f27ee..e658d8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
@@ -31,7 +31,14 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <sys/ioctl.h> /* for FIONREAD */
+
+/* For FIONREAD */
+#if defined(__sun)
+#include <sys/filio.h>
+#else
+#include <sys/ioctl.h>
+#endif
+
 #include <sys/socket.h>
 #include <sys/stat.h>
 #include <sys/types.h>


[07/24] hadoop git commit: HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)

Posted by ar...@apache.org.
HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff5999d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff5999d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff5999d

Branch: refs/heads/HDFS-7240
Commit: bff5999d07e9416a22846c849487e509ede55040
Parents: 37d7395
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jul 2 16:11:50 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Thu Jul 2 16:11:50 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   2 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 233 +++++++++++++------
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 4 files changed, 169 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ec37542..7b96c56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -696,6 +696,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
 
+    HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
+    (vinayakumarb)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index f4ceab3..4923a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1181,7 +1181,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     //    Get block info from namenode
     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
     try {
-      return new DFSInputStream(this, src, verifyChecksum);
+      return new DFSInputStream(this, src, verifyChecksum, null);
     } finally {
       scope.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6563d7b..7f3722f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
-  private final DFSClient dfsClient;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private final String src;
-  private final boolean verifyChecksum;
+  protected final DFSClient dfsClient;
+  protected AtomicBoolean closed = new AtomicBoolean(false);
+  protected final String src;
+  protected final boolean verifyChecksum;
 
   // state by stateful read only:
   // (protected by lock on this)
   /////
   private DatanodeInfo currentNode = null;
-  private LocatedBlock currentLocatedBlock = null;
-  private long pos = 0;
-  private long blockEnd = -1;
+  protected LocatedBlock currentLocatedBlock = null;
+  protected long pos = 0;
+  protected long blockEnd = -1;
   private BlockReader blockReader = null;
   ////
 
   // state shared by stateful and positional read:
   // (protected by lock on infoLock)
   ////
-  private LocatedBlocks locatedBlocks = null;
+  protected LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
   private FileEncryptionInfo fileEncryptionInfo = null;
-  private CachingStrategy cachingStrategy;
+  protected CachingStrategy cachingStrategy;
   ////
 
-  private final ReadStatistics readStatistics = new ReadStatistics();
+  protected final ReadStatistics readStatistics = new ReadStatistics();
   // lock for state shared between read and pread
   // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
   //       (it's OK to acquire this lock when the lock on <this> is held)
-  private final Object infoLock = new Object();
+  protected final Object infoLock = new Object();
 
   /**
    * Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * back to the namenode to get a new list of block locations, and is
    * capped at maxBlockAcquireFailures
    */
-  private int failures = 0;
+  protected int failures = 0;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     deadNodes.put(dnInfo, dnInfo);
   }
   
-  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
-                 ) throws IOException, UnresolvedLinkException {
+  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+      LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
     this.dfsClient = dfsClient;
     this.verifyChecksum = verifyChecksum;
     this.src = src;
     synchronized (infoLock) {
       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
     }
-    openInfo();
+    this.locatedBlocks = locatedBlocks;
+    openInfo(false);
   }
 
   /**
    * Grab the open-file info from namenode
+   * @param refreshLocatedBlocks whether to re-fetch locatedblocks
    */
-  void openInfo() throws IOException, UnresolvedLinkException {
+  void openInfo(boolean refreshLocatedBlocks) throws IOException,
+      UnresolvedLinkException {
     final DfsClientConf conf = dfsClient.getConf();
     synchronized(infoLock) {
-      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+      lastBlockBeingWrittenLength =
+          fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
       int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
       while (retriesForLastBlockLength > 0) {
         // Getting last block length as -1 is a special case. When cluster
@@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
               + "Datanodes might not have reported blocks completely."
               + " Will retry for " + retriesForLastBlockLength + " times");
           waitFor(conf.getRetryIntervalForGetLastBlockLength());
-          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+          lastBlockBeingWrittenLength =
+              fetchLocatedBlocksAndGetLastBlockLength(true);
         } else {
           break;
         }
@@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
-    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
+  private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+      throws IOException {
+    LocatedBlocks newInfo = locatedBlocks;
+    if (locatedBlocks == null || refresh) {
+      newInfo = dfsClient.getLocatedBlocks(src, 0);
+    }
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("newInfo = " + newInfo);
     }
@@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return located block
    * @throws IOException
    */
-  private LocatedBlock getBlockAt(long offset) throws IOException {
+  protected LocatedBlock getBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       assert (locatedBlocks != null) : "locatedBlocks is null";
 
@@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Fetch a block from namenode and cache it */
-  private void fetchBlockAt(long offset) throws IOException {
+  protected void fetchBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
 
     // Will be getting a new BlockReader.
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
 
     //
     // Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         return chosenNode;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + targetAddr
               + " : " + ex);
           // The encryption key used is invalid.
@@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           fetchBlockAt(target);
         } else {
           connectFailedOnce = true;
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
-            +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+            + ", add to deadNodes and continue. " + ex, ex);
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }
@@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           "unreleased ByteBuffers allocated by read().  " +
           "Please release " + builder.toString() + ".");
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
     super.close();
   }
 
@@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Wraps different possible read implementations so that readBuffer can be
    * strategy-agnostic.
    */
-  private interface ReaderStrategy {
+  interface ReaderStrategy {
     public int doRead(BlockReader blockReader, int off, int len)
         throws ChecksumException, IOException;
+
+    /**
+     * Copy data from the src ByteBuffer into the read buffer.
+     * @param src The src buffer where the data is copied from
+     * @param offset Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the offset of the byte array for copy.
+     * @param length Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the length of the data to copy.
+     */
+    public int copyFrom(ByteBuffer src, int offset, int length);
   }
 
-  private void updateReadStatistics(ReadStatistics readStatistics, 
+  protected void updateReadStatistics(ReadStatistics readStatistics,
         int nRead, BlockReader blockReader) {
     if (nRead <= 0) return;
     synchronized(infoLock) {
@@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       updateReadStatistics(readStatistics, nRead, blockReader);
       return nRead;
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      writeSlice.get(buf, offset, length);
+      return length;
+    }
   }
 
   /**
    * Used to read bytes into a user-supplied ByteBuffer
    */
-  private class ByteBufferStrategy implements ReaderStrategy {
+  protected class ByteBufferStrategy implements ReaderStrategy {
     final ByteBuffer buf;
     ByteBufferStrategy(ByteBuffer buf) {
       this.buf = buf;
@@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         int ret = blockReader.read(buf);
         success = true;
         updateReadStatistics(readStatistics, ret, blockReader);
+        if (ret == 0) {
+          DFSClient.LOG.warn("zero");
+        }
         return ret;
       } finally {
         if (!success) {
@@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         }
       } 
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+      writeSlice.limit(writeSlice.position() + remaining);
+      buf.put(writeSlice);
+      return remaining;
+    }
   }
 
   /* This is a used by regular read() and handles ChecksumExceptions.
@@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
@@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Add corrupted block replica into map.
    */
-  private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 
+  protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     Set<DatanodeInfo> dnSet = null;
     if((corruptedBlockMap.containsKey(blk))) {
@@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         } catch (InterruptedException iex) {
         }
         deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-        openInfo();
-        block = getBlockAt(block.getStartOffset());
+        openInfo(true);
+        block = refreshLocatedBlock(block);
         failures++;
       }
     }
@@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param ignoredNodes Do not choose nodes in this array (may be null)
    * @return The DNAddrPair of the best node. Null if no node can be chosen.
    */
-  private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+  protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) {
     DatanodeInfo[] nodes = block.getLocations();
     StorageType[] storageTypes = block.getStorageTypes();
@@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return errMsgr.toString();
   }
 
-  private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+  protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       try {
-        actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
+        actualGetFromOneDataNode(addressPair, block, start, end,
             buf, offset, corruptedBlockMap);
         return;
       } catch (IOException e) {
@@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
-      final long blockStartOffset, final long start, final long end,
+      final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
       final int hedgedReadId) {
@@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         TraceScope scope =
             Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
         try {
-          actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+          actualGetFromOneDataNode(datanode, block, start, end, buf,
               offset, corruptedBlockMap);
           return bb;
         } finally {
@@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     };
   }
 
+  /**
+   * Used when reading contiguous blocks
+   */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
-      long blockStartOffset, final long start, final long end, byte[] buf,
+      LocatedBlock block, final long start, final long end, byte[] buf,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
+    final int length = (int) (end - start + 1);
+    actualGetFromOneDataNode(datanode, block, start, end, buf,
+        new int[]{offset}, new int[]{length}, corruptedBlockMap);
+  }
+
+  /**
+   * Read data from one DataNode.
+   * @param datanode the datanode from which to read data
+   * @param block the located block containing the requested data
+   * @param startInBlk the startInBlk offset of the block
+   * @param endInBlk the endInBlk offset of the block
+   * @param buf the given byte array into which the data is read
+   * @param offsets the data may be read into multiple segments of the buf
+   *                (when reading a striped block). this array indicates the
+   *                offset of each buf segment.
+   * @param lengths the length of each buf segment
+   * @param corruptedBlockMap map recording list of datanodes with corrupted
+   *                          block replica
+   */
+  void actualGetFromOneDataNode(final DNAddrPair datanode,
+      LocatedBlock block, final long startInBlk, final long endInBlk,
+      byte[] buf, int[] offsets, int[] lengths,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
+    final int len = (int) (endInBlk - startInBlk + 1);
+    checkReadPortions(offsets, lengths, len);
 
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the
       // start of the loop.
-      LocatedBlock block = getBlockAt(blockStartOffset);
+      block = refreshLocatedBlock(block);
       BlockReader reader = null;
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
-        int len = (int) (end - start + 1);
-        reader = getBlockReader(block, start, len, datanode.addr,
+        reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
-        int nread = reader.readAll(buf, offset, len);
-        updateReadStatistics(readStatistics, nread, reader);
-
-        if (nread != len) {
-          throw new IOException("truncated return from reader.read(): " +
-                                "excpected " + len + ", got " + nread);
+        for (int i = 0; i < offsets.length; i++) {
+          int nread = reader.readAll(buf, offsets[i], lengths[i]);
+          updateReadStatistics(readStatistics, nread, reader);
+          if (nread != lengths[i]) {
+            throw new IOException("truncated return from reader.read(): " +
+                "excpected " + lengths[i] + ", got " + nread);
+          }
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
-   * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+   * Refresh cached block locations.
+   * @param block The currently cached block locations
+   * @return Refreshed block locations
+   * @throws IOException
+   */
+  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+      throws IOException {
+    return getBlockAt(block.getStartOffset());
+  }
+
+  /**
+   * This method verifies that the read portions are valid and do not overlap
+   * with each other.
+   */
+  private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+    Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
+    int sum = 0;
+    for (int i = 0; i < lengths.length; i++) {
+      if (i > 0) {
+        int gap = offsets[i] - offsets[i - 1];
+        // make sure read portions do not overlap with each other
+        Preconditions.checkArgument(gap >= lengths[i - 1]);
+      }
+      sum += lengths[i];
+    }
+    Preconditions.checkArgument(sum == totalLen);
+  }
+
+  /**
+   * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
    * 'hedged' read if the first read is taking longer than configured amount of
    * time. We then wait on which ever read returns first.
    */
-  private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
+  private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     int hedgedReadId = 0;
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       // see HDFS-6591, this metric is used to verify/catch unnecessary loops
       hedgedReadOpsLoopNumForTesting++;
@@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-            chosenNode, block.getStartOffset(), start, end, bb,
+            chosenNode, block, start, end, bb,
             corruptedBlockMap, hedgedReadId++);
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
@@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           }
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block.getStartOffset(), start, end, bb,
+              chosenNode, block, start, end, bb,
               corruptedBlockMap, hedgedReadId++);
           Future<ByteBuffer> oneMoreRequest = hedgedService
               .submit(getFromDataNodeCallable);
@@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return true if block access token has expired or invalid and it should be
    *         refetched
    */
-  private static boolean tokenRefetchNeeded(IOException ex,
+  protected static boolean tokenRefetchNeeded(IOException ex,
       InetSocketAddress targetAddr) {
     /*
      * Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
         if (dfsClient.isHedgedReadsEnabled()) {
-          hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          hedgedFetchBlockByteRange(blk, targetStart,
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
-          fetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+              buffer, offset, corruptedBlockMap);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
@@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param corruptedBlockMap map of corrupted blocks
    * @param dataNodeCount number of data nodes who contains the block replicas
    */
-  private void reportCheckSumFailure(
+  protected void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 
       int dataNodeCount) {
     if (corruptedBlockMap.isEmpty()) {
@@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    */
   @Override
-  public synchronized long getPos() throws IOException {
+  public synchronized long getPos() {
     return pos;
   }
 
@@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Utility class to encapsulate data node info and its address. */
-  private static final class DNAddrPair {
+  static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
     final StorageType storageType;
@@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private void closeCurrentBlockReader() {
+  protected void closeCurrentBlockReaders() {
     if (blockReader == null) return;
     // Close the current block reader so that the new caching settings can 
     // take effect immediately.
@@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   @Override
@@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   /**
@@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
 
   @Override
   public synchronized void unbuffer() {
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 68cc155..43e0eb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -343,7 +343,7 @@ public class TestDFSClientRetries {
       // we're starting a new operation on the user level.
       doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
         .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
-      is.openInfo();
+      is.openInfo(true);
       // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
       // just keep reading on the existing stream and the fact that we've poisoned
       // the block info won't do anything.


[09/24] hadoop git commit: HADOOP-12173. NetworkTopology::add calls toString always. Contributed by Inigo Goiri

Posted by ar...@apache.org.
HADOOP-12173. NetworkTopology::add calls toString always. Contributed by Inigo Goiri


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e59f6fad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e59f6fad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e59f6fad

Branch: refs/heads/HDFS-7240
Commit: e59f6fad6a8849cfab6acbf012f338d9cc7dd63c
Parents: 5fddc51
Author: Chris Douglas <cd...@apache.org>
Authored: Thu Jul 2 21:39:48 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Jul 2 21:39:48 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/hadoop/net/NetworkTopology.java      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e59f6fad/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index c60cc0b..63b6763 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -396,14 +396,13 @@ public class NetworkTopology {
     int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
     netlock.writeLock().lock();
     try {
-      String oldTopoStr = this.toString();
       if( node instanceof InnerNode ) {
         throw new IllegalArgumentException(
           "Not allow to add an inner node: "+NodeBase.getPath(node));
       }
       if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
         LOG.error("Error: can't add leaf node " + NodeBase.getPath(node) +
-            " at depth " + newDepth + " to topology:\n" + oldTopoStr);
+            " at depth " + newDepth + " to topology:\n" + this.toString());
         throw new InvalidTopologyException("Failed to add " + NodeBase.getPath(node) +
             ": You cannot have a rack and a non-rack node at the same " +
             "level of the network topology.");


[22/24] hadoop git commit: YARN-2194. Fix bug causing CGroups functionality to fail on RHEL7. Contributed by Wei Yan.

Posted by ar...@apache.org.
YARN-2194. Fix bug causing CGroups functionality to fail on RHEL7. Contributed by Wei Yan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c40bdb56
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c40bdb56
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c40bdb56

Branch: refs/heads/HDFS-7240
Commit: c40bdb56a79fe1499c2284d493edc84620c0c078
Parents: 99c8c58
Author: Varun Vasudev <vv...@apache.org>
Authored: Tue Jul 7 16:59:29 2015 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Tue Jul 7 16:59:29 2015 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/util/StringUtils.java  |  8 ++++++++
 .../java/org/apache/hadoop/util/TestStringUtils.java   |  4 ++++
 hadoop-yarn-project/CHANGES.txt                        |  3 +++
 .../server/nodemanager/LinuxContainerExecutor.java     | 12 ++++++++----
 .../linux/privileged/PrivilegedOperation.java          |  1 +
 .../linux/privileged/PrivilegedOperationExecutor.java  |  2 +-
 .../nodemanager/util/CgroupsLCEResourcesHandler.java   |  6 ++++--
 .../native/container-executor/impl/configuration.c     |  8 ++++----
 .../container-executor/test/test-container-executor.c  |  4 ++--
 .../TestLinuxContainerExecutorWithMocks.java           | 13 +++++++++----
 10 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index fc4b0ab..73f9c4f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -872,6 +872,10 @@ public class StringUtils {
     return sb.toString();
   }
 
+  public static String join(char separator, Iterable<?> strings) {
+    return join(separator + "", strings);
+  }
+
   /**
    * Concatenates strings, using a separator.
    *
@@ -894,6 +898,10 @@ public class StringUtils {
     return sb.toString();
   }
 
+  public static String join(char separator, String[] strings) {
+    return join(separator + "", strings);
+  }
+
   /**
    * Convert SOME_STUFF to SomeStuff
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
index 85ab8c4..e3e613c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringUtils.java
@@ -278,8 +278,12 @@ public class TestStringUtils extends UnitTestcaseTimeLimit {
     s.add("c");
     assertEquals("", StringUtils.join(":", s.subList(0, 0)));
     assertEquals("a", StringUtils.join(":", s.subList(0, 1)));
+    assertEquals("", StringUtils.join(':', s.subList(0, 0)));
+    assertEquals("a", StringUtils.join(':', s.subList(0, 1)));
     assertEquals("a:b", StringUtils.join(":", s.subList(0, 2)));
     assertEquals("a:b:c", StringUtils.join(":", s.subList(0, 3)));
+    assertEquals("a:b", StringUtils.join(':', s.subList(0, 2)));
+    assertEquals("a:b:c", StringUtils.join(':', s.subList(0, 3)));
   }
   
   @Test (timeout = 30000)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8f63187..2d1d6a2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -595,6 +595,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3837. javadocs of TimelineAuthenticationFilterInitializer give wrong
     prefix for auth options. (Bibin A Chundatt via devaraj)
 
+    YARN-2194. Fix bug causing CGroups functionality to fail on RHEL7.
+    (Wei Yan via vvasudev)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index dbe257d..b936969 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -241,8 +241,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
                    Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
                    appId,
                    nmPrivateContainerTokensPath.toUri().getPath().toString(),
-                   StringUtils.join(",", localDirs),
-                   StringUtils.join(",", logDirs)));
+                   StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+                       localDirs),
+                   StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+                       logDirs)));
 
     File jvm =                                  // use same jvm as parent
       new File(new File(System.getProperty("java.home"), "bin"), "java");
@@ -363,8 +365,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
             nmPrivateContainerScriptPath.toUri().getPath().toString(),
             nmPrivateTokensPath.toUri().getPath().toString(),
             pidFilePath.toString(),
-            StringUtils.join(",", localDirs),
-            StringUtils.join(",", logDirs),
+            StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+                localDirs),
+            StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+                logDirs),
             resourcesOptions));
 
         if (tcCommandFile != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
index 74556a8..f220cbd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
@@ -36,6 +36,7 @@ import java.util.List;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class PrivilegedOperation {
+  public final static char LINUX_FILE_PATH_SEPARATOR = '%';
 
   public enum OperationType {
     CHECK_SETUP("--checksetup"),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
index 1c4a51c..6fe0f5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
@@ -234,7 +234,7 @@ public class PrivilegedOperationExecutor {
 
       if (noneArgsOnly == false) {
         //We have already appended at least one tasks file.
-        finalOpArg.append(",");
+        finalOpArg.append(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR);
         finalOpArg.append(tasksFile);
       } else {
         finalOpArg.append(tasksFile);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
index b38e559..6994fc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -409,10 +410,11 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
 
     if (isCpuWeightEnabled()) {
       sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks");
-      sb.append(",");
+      sb.append(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR);
     }
 
-    if (sb.charAt(sb.length() - 1) == ',') {
+    if (sb.charAt(sb.length() - 1) ==
+        PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR) {
       sb.deleteCharAt(sb.length() - 1);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
index 15b53ae..51adc97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
@@ -283,7 +283,7 @@ char * get_value(const char* key) {
 
 /**
  * Function to return an array of values for a key.
- * Value delimiter is assumed to be a comma.
+ * Value delimiter is assumed to be a '%'.
  */
 char ** get_values(const char * key) {
   char *value = get_value(key);
@@ -291,7 +291,7 @@ char ** get_values(const char * key) {
 }
 
 /**
- * Extracts array of values from the comma separated list of values.
+ * Extracts array of values from the '%' separated list of values.
  */
 char ** extract_values(char *value) {
   char ** toPass = NULL;
@@ -303,14 +303,14 @@ char ** extract_values(char *value) {
   //first allocate any array of 10
   if(value != NULL) {
     toPass = (char **) malloc(sizeof(char *) * toPassSize);
-    tempTok = strtok_r((char *)value, ",", &tempstr);
+    tempTok = strtok_r((char *)value, "%", &tempstr);
     while (tempTok != NULL) {
       toPass[size++] = tempTok;
       if(size == toPassSize) {
         toPassSize += MAX_SIZE;
         toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
       }
-      tempTok = strtok_r(NULL, ",", &tempstr);
+      tempTok = strtok_r(NULL, "%", &tempstr);
     }
   }
   if (toPass != NULL) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
index be6cc49..13604f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
@@ -30,8 +30,8 @@
 
 #define TEST_ROOT "/tmp/test-container-executor"
 #define DONT_TOUCH_FILE "dont-touch-me"
-#define NM_LOCAL_DIRS       TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
-               TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
+#define NM_LOCAL_DIRS       TEST_ROOT "/local-1%" TEST_ROOT "/local-2%" \
+               TEST_ROOT "/local-3%" TEST_ROOT "/local-4%" TEST_ROOT "/local-5"
 #define NM_LOG_DIRS         TEST_ROOT "/logs/userlogs"
 #define ARRAY_SIZE 1000
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c40bdb56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
index d48ce13..82b7fd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -148,8 +149,10 @@ public class TestLinuxContainerExecutorWithMocks {
     assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
         appSubmitter, cmd, appId, containerId,
         workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
-        StringUtils.join(",", dirsHandler.getLocalDirs()),
-        StringUtils.join(",", dirsHandler.getLogDirs()), "cgroups=none"),
+        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+            dirsHandler.getLocalDirs()),
+        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+            dirsHandler.getLogDirs()), "cgroups=none"),
         readMockParams());
     
   }
@@ -312,8 +315,10 @@ public class TestLinuxContainerExecutorWithMocks {
     assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
         appSubmitter, cmd, appId, containerId,
         workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
-        StringUtils.join(",", dirsHandler.getLocalDirs()),
-        StringUtils.join(",", dirsHandler.getLogDirs()),
+        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+            dirsHandler.getLocalDirs()),
+        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+            dirsHandler.getLogDirs()),
         "cgroups=none"), readMockParams());
 
   }


[13/24] hadoop git commit: HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)

Posted by ar...@apache.org.
HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/233cab89
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/233cab89
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/233cab89

Branch: refs/heads/HDFS-7240
Commit: 233cab89adb6bae21d7e171f2af516b92266242c
Parents: bff67df
Author: Vinayakumar B <vi...@apache.org>
Authored: Mon Jul 6 15:39:43 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Mon Jul 6 15:39:43 2015 +0530

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 ++
 .../apache/hadoop/ha/ActiveStandbyElector.java  | 20 +++++++++++--
 .../hadoop/ha/TestActiveStandbyElector.java     | 31 ++++++++++++++++++++
 3 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/233cab89/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 312a996..1d737e5 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -924,6 +924,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12164. Fix TestMove and TestFsShellReturnCode failed to get command
     name using reflection. (Lei (Eddy) Xu)
 
+    HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
+    multiple times (zhihai xu via vinayakumarb)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233cab89/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
index e520a16..e458181 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
@@ -173,7 +173,9 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
 
   private Lock sessionReestablishLockForTests = new ReentrantLock();
   private boolean wantToBeInElection;
-  
+  private boolean monitorLockNodePending = false;
+  private ZooKeeper monitorLockNodeClient;
+
   /**
    * Create a new ActiveStandbyElector object <br/>
    * The elector is created by providing to it the Zookeeper configuration, the
@@ -468,7 +470,8 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
   public synchronized void processResult(int rc, String path, Object ctx,
       Stat stat) {
     if (isStaleClient(ctx)) return;
-    
+    monitorLockNodePending = false;
+
     assert wantToBeInElection :
         "Got a StatNode result after quitting election";
 
@@ -744,6 +747,11 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
     return state;
   }
 
+  @VisibleForTesting
+  synchronized boolean isMonitorLockNodePending() {
+    return monitorLockNodePending;
+  }
+
   private boolean reEstablishSession() {
     int connectionRetryCount = 0;
     boolean success = false;
@@ -949,7 +957,13 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
   }
 
   private void monitorLockNodeAsync() {
-    zkClient.exists(zkLockFilePath, 
+    if (monitorLockNodePending && monitorLockNodeClient == zkClient) {
+      LOG.info("Ignore duplicate monitor lock-node request.");
+      return;
+    }
+    monitorLockNodePending = true;
+    monitorLockNodeClient = zkClient;
+    zkClient.exists(zkLockFilePath,
         watcher, this,
         zkClient);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233cab89/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
index 2e578e2..83a3a4f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
@@ -452,6 +452,10 @@ public class TestActiveStandbyElector {
         Event.KeeperState.SyncConnected);
     elector.processWatchEvent(mockZK, mockEvent);
     verifyExistCall(1);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+    elector.processResult(Code.SESSIONEXPIRED.intValue(), ZK_LOCK_NAME,
+        mockZK, new Stat());
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     // session expired should enter safe mode and initiate re-election
     // re-election checked via checking re-creation of new zookeeper and
@@ -495,6 +499,13 @@ public class TestActiveStandbyElector {
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+
+    Stat stat = new Stat();
+    stat.setEphemeralOwner(0L);
+    Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
     Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
@@ -504,12 +515,18 @@ public class TestActiveStandbyElector {
         Event.EventType.NodeDataChanged);
     elector.processWatchEvent(mockZK, mockEvent);
     verifyExistCall(2);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     // monitoring should be setup again after event is received
     Mockito.when(mockEvent.getType()).thenReturn(
         Event.EventType.NodeChildrenChanged);
     elector.processWatchEvent(mockZK, mockEvent);
     verifyExistCall(3);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     // lock node deletion when in standby mode should create znode again
     // successful znode creation enters active state and sets monitor
@@ -524,6 +541,10 @@ public class TestActiveStandbyElector {
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     verifyExistCall(4);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+    stat.setEphemeralOwner(1L);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     // lock node deletion in active mode should enter neutral mode and create
     // znode again successful znode creation enters active state and sets
@@ -538,6 +559,9 @@ public class TestActiveStandbyElector {
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
     verifyExistCall(5);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     // bad path name results in fatal error
     Mockito.when(mockEvent.getPath()).thenReturn(null);
@@ -570,6 +594,13 @@ public class TestActiveStandbyElector {
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);
+    Assert.assertTrue(elector.isMonitorLockNodePending());
+
+    Stat stat = new Stat();
+    stat.setEphemeralOwner(0L);
+    Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Assert.assertFalse(elector.isMonitorLockNodePending());
 
     WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
     Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);


[16/24] hadoop git commit: HADOOP-12185. NetworkTopology is not efficient adding/getting/removing nodes. Contributed by Inigo Goiri

Posted by ar...@apache.org.
HADOOP-12185. NetworkTopology is not efficient adding/getting/removing nodes. Contributed by Inigo Goiri


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47a69ec7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47a69ec7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47a69ec7

Branch: refs/heads/HDFS-7240
Commit: 47a69ec7a5417cb56b75d07184dd6888ff068302
Parents: ed1e3ce
Author: Chris Douglas <cd...@apache.org>
Authored: Mon Jul 6 15:03:22 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Jul 6 15:03:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/net/NetworkTopology.java  | 59 +++++++--------
 .../apache/hadoop/net/TestClusterTopology.java  | 75 ++++++++++++++++++--
 2 files changed, 102 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/47a69ec7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 63b6763..970ad40 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -18,9 +18,11 @@
 package org.apache.hadoop.net;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -80,6 +82,7 @@ public class NetworkTopology {
    */
   static class InnerNode extends NodeBase {
     protected List<Node> children=new ArrayList<Node>();
+    private Map<String, Node> childrenMap = new HashMap<String, Node>();
     private int numOfLeaves;
         
     /** Construct an InnerNode from a path-like string */
@@ -171,10 +174,13 @@ public class NetworkTopology {
         // this node is the parent of n; add n directly
         n.setParent(this);
         n.setLevel(this.level+1);
-        for(int i=0; i<children.size(); i++) {
-          if (children.get(i).getName().equals(n.getName())) {
-            children.set(i, n);
-            return false;
+        Node prev = childrenMap.put(n.getName(), n);
+        if (prev != null) {
+          for(int i=0; i<children.size(); i++) {
+            if (children.get(i).getName().equals(n.getName())) {
+              children.set(i, n);
+              return false;
+            }
           }
         }
         children.add(n);
@@ -183,17 +189,12 @@ public class NetworkTopology {
       } else {
         // find the next ancestor node
         String parentName = getNextAncestorName(n);
-        InnerNode parentNode = null;
-        for(int i=0; i<children.size(); i++) {
-          if (children.get(i).getName().equals(parentName)) {
-            parentNode = (InnerNode)children.get(i);
-            break;
-          }
-        }
+        InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
         if (parentNode == null) {
           // create a new InnerNode
           parentNode = createParentNode(parentName);
           children.add(parentNode);
+          childrenMap.put(parentNode.getName(), parentNode);
         }
         // add n to the subtree of the next ancestor node
         if (parentNode.add(n)) {
@@ -234,12 +235,15 @@ public class NetworkTopology {
                                            +parent+", is not a descendent of "+currentPath);
       if (isParent(n)) {
         // this node is the parent of n; remove n directly
-        for(int i=0; i<children.size(); i++) {
-          if (children.get(i).getName().equals(n.getName())) {
-            children.remove(i);
-            numOfLeaves--;
-            n.setParent(null);
-            return true;
+        if (childrenMap.containsKey(n.getName())) {
+          for (int i=0; i<children.size(); i++) {
+            if (children.get(i).getName().equals(n.getName())) {
+              children.remove(i);
+              childrenMap.remove(n.getName());
+              numOfLeaves--;
+              n.setParent(null);
+              return true;
+            }
           }
         }
         return false;
@@ -262,7 +266,8 @@ public class NetworkTopology {
         // if the parent node has no children, remove the parent node too
         if (isRemoved) {
           if (parentNode.getNumOfChildren() == 0) {
-            children.remove(i);
+            Node prev = children.remove(i);
+            childrenMap.remove(prev.getName());
           }
           numOfLeaves--;
         }
@@ -279,12 +284,7 @@ public class NetworkTopology {
       if (loc == null || loc.length() == 0) return this;
             
       String[] path = loc.split(PATH_SEPARATOR_STR, 2);
-      Node childnode = null;
-      for(int i=0; i<children.size(); i++) {
-        if (children.get(i).getName().equals(path[0])) {
-          childnode = children.get(i);
-        }
-      }
+      Node childnode = childrenMap.get(path[0]);
       if (childnode == null) return null; // non-existing node
       if (path.length == 1) return childnode;
       if (childnode instanceof InnerNode) {
@@ -311,10 +311,13 @@ public class NetworkTopology {
         isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
       if (isLeafParent()) { // children are leaves
         if (isLeaf) { // excluded node is a leaf node
-          int excludedIndex = children.indexOf(excludedNode);
-          if (excludedIndex != -1 && leafIndex >= 0) {
-            // excluded node is one of the children so adjust the leaf index
-            leafIndex = leafIndex>=excludedIndex ? leafIndex+1 : leafIndex;
+          if (excludedNode != null &&
+              childrenMap.containsKey(excludedNode.getName())) {
+            int excludedIndex = children.indexOf(excludedNode);
+            if (excludedIndex != -1 && leafIndex >= 0) {
+              // excluded node is one of the children so adjust the leaf index
+              leafIndex = leafIndex>=excludedIndex ? leafIndex+1 : leafIndex;
+            }
           }
         }
         // range check

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47a69ec7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
index 3ab663f..72fc5cb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
@@ -18,8 +18,10 @@
 package org.apache.hadoop.net;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
+import org.apache.commons.math3.stat.inference.ChiSquareTest;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -79,12 +81,14 @@ public class TestClusterTopology extends Assert {
   public void testCountNumNodes() throws Exception {
     // create the topology
     NetworkTopology cluster = new NetworkTopology();
-    cluster.add(getNewNode("node1", "/d1/r1"));
+    NodeElement node1 = getNewNode("node1", "/d1/r1");
+    cluster.add(node1);
     NodeElement node2 = getNewNode("node2", "/d1/r2");
     cluster.add(node2);
-    cluster.add(getNewNode("node3", "/d1/r3"));
-    NodeElement node3 = getNewNode("node4", "/d1/r4");
+    NodeElement node3 = getNewNode("node3", "/d1/r3");
     cluster.add(node3);
+    NodeElement node4 = getNewNode("node4", "/d1/r4");
+    cluster.add(node4);
     // create exclude list
     List<Node> excludedNodes = new ArrayList<Node>();
 
@@ -95,7 +99,7 @@ public class TestClusterTopology extends Assert {
     assertEquals("4 nodes should be available with extra excluded Node", 4,
         cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
     // add one existing node to exclude list
-    excludedNodes.add(node3);
+    excludedNodes.add(node4);
     assertEquals("excluded nodes with ROOT scope should be considered", 3,
         cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
     assertEquals("excluded nodes without ~ scope should be considered", 2,
@@ -112,6 +116,69 @@ public class TestClusterTopology extends Assert {
     // getting count with non-exist scope.
     assertEquals("No nodes should be considered for non-exist scope", 0,
         cluster.countNumOfAvailableNodes("/non-exist", excludedNodes));
+    // remove a node from the cluster
+    cluster.remove(node1);
+    assertEquals("1 node should be available", 1,
+        cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
+  }
+
+  /**
+   * Test how well we pick random nodes.
+   */
+  @Test
+  public void testChooseRandom() {
+    // create the topology
+    NetworkTopology cluster = new NetworkTopology();
+    NodeElement node1 = getNewNode("node1", "/d1/r1");
+    cluster.add(node1);
+    NodeElement node2 = getNewNode("node2", "/d1/r2");
+    cluster.add(node2);
+    NodeElement node3 = getNewNode("node3", "/d1/r3");
+    cluster.add(node3);
+    NodeElement node4 = getNewNode("node4", "/d1/r3");
+    cluster.add(node4);
+
+    // Number of iterations to do the test
+    int numIterations = 100;
+
+    // Pick random nodes
+    HashMap<String,Integer> histogram = new HashMap<String,Integer>();
+    for (int i=0; i<numIterations; i++) {
+      String randomNode = cluster.chooseRandom(NodeBase.ROOT).getName();
+      if (!histogram.containsKey(randomNode)) {
+        histogram.put(randomNode, 0);
+      }
+      histogram.put(randomNode, histogram.get(randomNode) + 1);
+    }
+    assertEquals("Random is not selecting all nodes", 4, histogram.size());
+
+    // Check with 99% confidence (alpha=0.01 as confidence = (100 * (1 - alpha)
+    ChiSquareTest chiSquareTest = new ChiSquareTest();
+    double[] expected = new double[histogram.size()];
+    long[] observed = new long[histogram.size()];
+    int j=0;
+    for (Integer occurrence : histogram.values()) {
+      expected[j] = 1.0 * numIterations / histogram.size();
+      observed[j] = occurrence;
+      j++;
+    }
+    boolean chiSquareTestRejected =
+        chiSquareTest.chiSquareTest(expected, observed, 0.01);
+
+    // Check that they have the proper distribution
+    assertFalse("Not choosing nodes randomly", chiSquareTestRejected);
+
+    // Pick random nodes excluding the 2 nodes in /d1/r3
+    histogram = new HashMap<String,Integer>();
+    for (int i=0; i<numIterations; i++) {
+      String randomNode = cluster.chooseRandom("~/d1/r3").getName();
+      if (!histogram.containsKey(randomNode)) {
+        histogram.put(randomNode, 0);
+      }
+      histogram.put(randomNode, histogram.get(randomNode) + 1);
+    }
+    assertEquals("Random is not selecting the nodes it should",
+        2, histogram.size());
   }
 
   private NodeElement getNewNode(String name, String rackLocation) {


[18/24] hadoop git commit: Release process for 2.7.1: Set the release date for 2.7.1.

Posted by ar...@apache.org.
Release process for 2.7.1: Set the release date for 2.7.1.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bf89ddb9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bf89ddb9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bf89ddb9

Branch: refs/heads/HDFS-7240
Commit: bf89ddb9b8ca27a34074b415f85599dd48b8fc50
Parents: d62b63d
Author: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Authored: Mon Jul 6 16:39:12 2015 -0700
Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Committed: Mon Jul 6 16:39:59 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt | 2 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     | 2 +-
 hadoop-mapreduce-project/CHANGES.txt            | 2 +-
 hadoop-yarn-project/CHANGES.txt                 | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index f2f9d5c..fab78d4 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -942,7 +942,7 @@ Release 2.7.2 - UNRELEASED
 
   BUG FIXES
 
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d264f74..e40ea3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1024,7 +1024,7 @@ Release 2.7.2 - UNRELEASED
 
   BUG FIXES
 
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2458403..4c60174 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -545,7 +545,7 @@ Release 2.7.2 - UNRELEASED
     MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
     if mapId is not in the cache. (zhihai xu via devaraj)
 
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf89ddb9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 803d725..f35be75 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -610,7 +610,7 @@ Release 2.7.2 - UNRELEASED
     YARN-3508. Prevent processing preemption events on the main RM dispatcher. 
     (Varun Saxena via wangda)
 
-Release 2.7.1 - UNRELEASED
+Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES
 


[10/24] hadoop git commit: HDFS-8577. Avoid retrying to recover lease on a file which does not exist (Contributed by J.Andreina)

Posted by ar...@apache.org.
HDFS-8577. Avoid retrying to recover lease on a file which does not exist (Contributed by J.Andreina)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2eae130a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2eae130a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2eae130a

Branch: refs/heads/HDFS-7240
Commit: 2eae130ab9edd318c82503c2306f610f2b5a3e51
Parents: e59f6fa
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Jul 3 13:35:48 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Fri Jul 3 13:35:48 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt               |  3 +++
 .../java/org/apache/hadoop/hdfs/tools/DebugAdmin.java     | 10 ++++++++--
 .../java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java |  8 ++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2eae130a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6678a3e..4f184fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1002,6 +1002,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8706. Fix typo in datanode startup options in HDFSCommands.html.
     (Brahma Reddy Battula via Arpit Agarwal)
 
+    HDFS-8577. Avoid retrying to recover lease on a file which does not exist
+    (J.Andreina via vinayakumarb)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2eae130a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
index 41f1ca4..d179a5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.tools;
 
 import java.io.DataInputStream;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -254,6 +255,11 @@ public class DebugAdmin extends Configured implements Tool {
         IOException ioe = null;
         try {
           recovered = dfs.recoverLease(new Path(pathStr));
+        } catch (FileNotFoundException e) {
+          System.err.println("recoverLease got exception: " + e.getMessage());
+          System.err.println("Giving up on recoverLease for " + pathStr +
+              " after 1 try");
+          return 1;
         } catch (IOException e) {
           ioe = e;
         }
@@ -262,8 +268,8 @@ public class DebugAdmin extends Configured implements Tool {
           return 0;
         }
         if (ioe != null) {
-          System.err.println("recoverLease got exception: ");
-          ioe.printStackTrace();
+          System.err.println("recoverLease got exception: " +
+              ioe.getMessage());
         } else {
           System.err.println("recoverLease returned false.");
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2eae130a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
index 52b194d..07f70e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java
@@ -37,6 +37,7 @@ import java.io.PrintStream;
 
 import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestDebugAdmin {
   private MiniDFSCluster cluster;
@@ -116,4 +117,11 @@ public class TestDebugAdmin {
             "-block", blockFile.getAbsolutePath()})
     );
   }
+
+  @Test(timeout = 60000)
+  public void testRecoverLeaseforFileNotFound() throws Exception {
+    assertTrue(runCmd(new String[] {
+        "recoverLease", "-path", "/foo", "-retries", "2" }).contains(
+        "Giving up on recoverLease for /foo after 1 try"));
+  }
 }


[11/24] hadoop git commit: YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner after create them. Contributed by zhihai xu

Posted by ar...@apache.org.
YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
after create them. Contributed by zhihai xu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/688617d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/688617d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/688617d6

Branch: refs/heads/HDFS-7240
Commit: 688617d6d7e6377a37682b5676b805cc6e8cf3f0
Parents: 2eae130
Author: Xuan <xg...@apache.org>
Authored: Sat Jul 4 21:51:58 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Sat Jul 4 21:51:58 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../logaggregation/AggregatedLogFormat.java     | 83 +++++++++++---------
 2 files changed, 49 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/688617d6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2009b47..803d725 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -589,6 +589,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
     properly in log. (Bibin A Chundatt via devaraj)
 
+    YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
+    after create them. (zhihai xu via xgong)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/688617d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index debe770..c9453b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -489,18 +489,23 @@ public class AggregatedLogFormat {
      * @throws IOException
      */
     public String getApplicationOwner() throws IOException {
-      TFile.Reader.Scanner ownerScanner = reader.createScanner();
-      LogKey key = new LogKey();
-      while (!ownerScanner.atEnd()) {
-        TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
-        key.readFields(entry.getKeyStream());
-        if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
-          DataInputStream valueStream = entry.getValueStream();
-          return valueStream.readUTF();
+      TFile.Reader.Scanner ownerScanner = null;
+      try {
+        ownerScanner = reader.createScanner();
+        LogKey key = new LogKey();
+        while (!ownerScanner.atEnd()) {
+          TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
+          key.readFields(entry.getKeyStream());
+          if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
+            DataInputStream valueStream = entry.getValueStream();
+            return valueStream.readUTF();
+          }
+          ownerScanner.advance();
         }
-        ownerScanner.advance();
+        return null;
+      } finally {
+        IOUtils.cleanup(LOG, ownerScanner);
       }
-      return null;
     }
 
     /**
@@ -513,38 +518,42 @@ public class AggregatedLogFormat {
     public Map<ApplicationAccessType, String> getApplicationAcls()
         throws IOException {
       // TODO Seek directly to the key once a comparator is specified.
-      TFile.Reader.Scanner aclScanner = reader.createScanner();
-      LogKey key = new LogKey();
-      Map<ApplicationAccessType, String> acls =
-          new HashMap<ApplicationAccessType, String>();
-      while (!aclScanner.atEnd()) {
-        TFile.Reader.Scanner.Entry entry = aclScanner.entry();
-        key.readFields(entry.getKeyStream());
-        if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
-          DataInputStream valueStream = entry.getValueStream();
-          while (true) {
-            String appAccessOp = null;
-            String aclString = null;
-            try {
-              appAccessOp = valueStream.readUTF();
-            } catch (EOFException e) {
-              // Valid end of stream.
-              break;
-            }
-            try {
-              aclString = valueStream.readUTF();
-            } catch (EOFException e) {
-              throw new YarnRuntimeException("Error reading ACLs", e);
+      TFile.Reader.Scanner aclScanner = null;
+      try {
+        aclScanner = reader.createScanner();
+        LogKey key = new LogKey();
+        Map<ApplicationAccessType, String> acls =
+            new HashMap<ApplicationAccessType, String>();
+        while (!aclScanner.atEnd()) {
+          TFile.Reader.Scanner.Entry entry = aclScanner.entry();
+          key.readFields(entry.getKeyStream());
+          if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
+            DataInputStream valueStream = entry.getValueStream();
+            while (true) {
+              String appAccessOp = null;
+              String aclString = null;
+              try {
+                appAccessOp = valueStream.readUTF();
+              } catch (EOFException e) {
+                // Valid end of stream.
+                break;
+              }
+              try {
+                aclString = valueStream.readUTF();
+              } catch (EOFException e) {
+                throw new YarnRuntimeException("Error reading ACLs", e);
+              }
+              acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
             }
-            acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
           }
-
+          aclScanner.advance();
         }
-        aclScanner.advance();
+        return acls;
+      } finally {
+        IOUtils.cleanup(LOG, aclScanner);
       }
-      return acls;
     }
-    
+
     /**
      * Read the next key and return the value-stream.
      * 


[03/24] hadoop git commit: YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda)

Posted by ar...@apache.org.
YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0e4b0669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0e4b0669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0e4b0669

Branch: refs/heads/HDFS-7240
Commit: 0e4b06690ff51fbde3ab26f68fde8aeb32af69af
Parents: 152e5df
Author: Wangda Tan <wa...@apache.org>
Authored: Wed Jul 1 17:32:22 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Jul 1 17:32:22 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../server/resourcemanager/ResourceManager.java | 34 ---------
 .../monitor/SchedulingEditPolicy.java           |  6 +-
 .../monitor/SchedulingMonitor.java              |  3 +-
 .../ProportionalCapacityPreemptionPolicy.java   | 42 ++++++-----
 .../scheduler/ContainerPreemptEvent.java        |  8 +-
 .../scheduler/ContainerPreemptEventType.java    | 26 -------
 .../scheduler/capacity/CapacityScheduler.java   | 24 ++++++
 .../scheduler/event/SchedulerEventType.java     |  7 +-
 .../resourcemanager/TestRMDispatcher.java       | 79 ++++++++++++++++++++
 ...estProportionalCapacityPreemptionPolicy.java | 19 +++--
 ...pacityPreemptionPolicyForNodePartitions.java | 12 ++-
 12 files changed, 161 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3620a71..6839548 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -601,6 +601,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
     Saxena via jlowe)
 
+    YARN-3508. Prevent processing preemption events on the main RM dispatcher. 
+    (Varun Saxena via wangda)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 4153ba1..1b606b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -614,9 +613,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
             YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
             SchedulingEditPolicy.class);
         if (policies.size() > 0) {
-          rmDispatcher.register(ContainerPreemptEventType.class,
-              new RMContainerPreemptEventDispatcher(
-                  (PreemptableResourceScheduler) scheduler));
           for (SchedulingEditPolicy policy : policies) {
             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
             // periodically check whether we need to take action to guarantee
@@ -787,36 +783,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   @Private
-  public static final class
-    RMContainerPreemptEventDispatcher
-      implements EventHandler<ContainerPreemptEvent> {
-
-    private final PreemptableResourceScheduler scheduler;
-
-    public RMContainerPreemptEventDispatcher(
-        PreemptableResourceScheduler scheduler) {
-      this.scheduler = scheduler;
-    }
-
-    @Override
-    public void handle(ContainerPreemptEvent event) {
-      ApplicationAttemptId aid = event.getAppId();
-      RMContainer container = event.getContainer();
-      switch (event.getType()) {
-      case DROP_RESERVATION:
-        scheduler.dropContainerReservation(container);
-        break;
-      case PREEMPT_CONTAINER:
-        scheduler.preemptContainer(aid, container);
-        break;
-      case KILL_CONTAINER:
-        scheduler.killContainer(container);
-        break;
-      }
-    }
-  }
-
-  @Private
   public static final class ApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
index 1ebc19f..0d587d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
@@ -18,14 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 
 public interface SchedulingEditPolicy {
 
-  public void init(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
+  public void init(Configuration config, RMContext context,
       PreemptableResourceScheduler scheduler);
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 1682f7d..d4c129b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService {
     return scheduleEditPolicy;
   }
 
-  @SuppressWarnings("unchecked")
   public void serviceInit(Configuration conf) throws Exception {
-    scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+    scheduleEditPolicy.init(conf, rmContext,
         (PreemptableResourceScheduler) rmContext.getScheduler());
     this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 1f47b5f..5a20a6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -38,13 +38,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -118,8 +118,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
-  // the dispatcher to send preempt and kill events
-  public EventHandler<ContainerPreemptEvent> dispatcher;
+  private RMContext rmContext;
 
   private final Clock clock;
   private double maxIgnoredOverCapacity;
@@ -141,20 +140,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   }
 
   public ProportionalCapacityPreemptionPolicy(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
-      CapacityScheduler scheduler) {
-    this(config, dispatcher, scheduler, new SystemClock());
+      RMContext context, CapacityScheduler scheduler) {
+    this(config, context, scheduler, new SystemClock());
   }
 
   public ProportionalCapacityPreemptionPolicy(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
-      CapacityScheduler scheduler, Clock clock) {
-    init(config, dispatcher, scheduler);
+      RMContext context, CapacityScheduler scheduler, Clock clock) {
+    init(config, context, scheduler);
     this.clock = clock;
   }
 
-  public void init(Configuration config,
-      EventHandler<ContainerPreemptEvent> disp,
+  public void init(Configuration config, RMContext context,
       PreemptableResourceScheduler sched) {
     LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
     assert null == scheduler : "Unexpected duplicate call to init";
@@ -163,7 +159,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
           sched.getClass().getCanonicalName() + " not instance of " +
           CapacityScheduler.class.getCanonicalName());
     }
-    dispatcher = disp;
+    rmContext = context;
     scheduler = (CapacityScheduler) sched;
     maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
     naturalTerminationFactor =
@@ -196,6 +192,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param root the root of the CapacityScheduler queue hierarchy
    * @param clusterResources the total amount of resources in the cluster
    */
+  @SuppressWarnings("unchecked")
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
     // All partitions to look at
@@ -248,8 +245,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // preempt (or kill) the selected containers
     for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
          : toPreempt.entrySet()) {
+      ApplicationAttemptId appAttemptId = e.getKey();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Send to scheduler: in app=" + e.getKey()
+        LOG.debug("Send to scheduler: in app=" + appAttemptId
             + " #containers-to-be-preempted=" + e.getValue().size());
       }
       for (RMContainer container : e.getValue()) {
@@ -257,13 +255,15 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         if (preempted.get(container) != null &&
             preempted.get(container) + maxWaitTime < clock.getTime()) {
           // kill it
-          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
-                ContainerPreemptEventType.KILL_CONTAINER));
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.KILL_CONTAINER));
           preempted.remove(container);
         } else {
           //otherwise just send preemption events
-          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
-                ContainerPreemptEventType.PREEMPT_CONTAINER));
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.PREEMPT_CONTAINER));
           if (preempted.get(container) == null) {
             preempted.put(container, clock.getTime());
           }
@@ -735,6 +735,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
    */
+  @SuppressWarnings("unchecked")
   private void preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Map<String, Resource> resToObtainByPartition,
       List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
@@ -758,8 +759,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
           clusterResource, preemptMap);
 
       if (!observeOnly) {
-        dispatcher.handle(new ContainerPreemptEvent(appId, c,
-            ContainerPreemptEventType.DROP_RESERVATION));
+        rmContext.getDispatcher().getEventHandler().handle(
+            new ContainerPreemptEvent(
+                appId, c, SchedulerEventType.DROP_RESERVATION));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
index 8eba48d..7ab2758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
@@ -19,20 +19,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 
 /**
  * Simple event class used to communicate containers unreservations, preemption, killing
  */
-public class ContainerPreemptEvent
-    extends AbstractEvent<ContainerPreemptEventType> {
+public class ContainerPreemptEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId aid;
   private final RMContainer container;
 
   public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
-      ContainerPreemptEventType type) {
+      SchedulerEventType type) {
     super(type);
     this.aid = aid;
     this.container = container;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
deleted file mode 100644
index a70a836..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-public enum ContainerPreemptEventType {
-
-  DROP_RESERVATION,
-  PREEMPT_CONTAINER,
-  KILL_CONTAINER
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index f1d0f9c..141aa7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -1346,6 +1347,29 @@ public class CapacityScheduler extends
           RMContainerEventType.EXPIRE);
     }
     break;
+    case DROP_RESERVATION:
+    {
+      ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
+      RMContainer container = dropReservationEvent.getContainer();
+      dropContainerReservation(container);
+    }
+    break;
+    case PREEMPT_CONTAINER:
+    {
+      ContainerPreemptEvent preemptContainerEvent =
+          (ContainerPreemptEvent)event;
+      ApplicationAttemptId aid = preemptContainerEvent.getAppId();
+      RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
+      preemptContainer(aid, containerToBePreempted);
+    }
+    break;
+    case KILL_CONTAINER:
+    {
+      ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
+      RMContainer containerToBeKilled = killContainerEvent.getContainer();
+      killContainer(containerToBeKilled);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 13aecb3..9de935b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -36,5 +36,10 @@ public enum SchedulerEventType {
   APP_ATTEMPT_REMOVED,
 
   // Source: ContainerAllocationExpirer
-  CONTAINER_EXPIRED
+  CONTAINER_EXPIRED,
+
+  // Source: SchedulingEditPolicy
+  DROP_RESERVATION,
+  PREEMPT_CONTAINER,
+  KILL_CONTAINER
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
new file mode 100644
index 0000000..db7c96a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMDispatcher {
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testSchedulerEventDispatcherForPreemptionEvents() {
+    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+    CapacityScheduler sched = spy(new CapacityScheduler());
+    YarnConfiguration conf = new YarnConfiguration();
+    SchedulerEventDispatcher schedulerDispatcher =
+        new SchedulerEventDispatcher(sched);
+    rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+    rmDispatcher.init(conf);
+    rmDispatcher.start();
+    schedulerDispatcher.init(conf);
+    schedulerDispatcher.start();
+    try {
+      ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
+      RMContainer container = mock(RMContainer.class);
+      ContainerPreemptEvent event1 = new ContainerPreemptEvent(
+          appAttemptId, container, SchedulerEventType.DROP_RESERVATION);
+      rmDispatcher.getEventHandler().handle(event1);
+      ContainerPreemptEvent event2 = new ContainerPreemptEvent(
+           appAttemptId, container, SchedulerEventType.KILL_CONTAINER);
+      rmDispatcher.getEventHandler().handle(event2);
+      ContainerPreemptEvent event3 = new ContainerPreemptEvent(
+          appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER);
+      rmDispatcher.getEventHandler().handle(event3);
+      // Wait for events to be processed by scheduler dispatcher.
+      Thread.sleep(1000);
+      verify(sched, times(3)).handle(any(SchedulerEvent.class));
+      verify(sched).dropContainerReservation(container);
+      verify(sched).preemptContainer(appAttemptId, container);
+      verify(sched).killContainer(container);
+    } catch (InterruptedException e) {
+      Assert.fail();
+    } finally {
+      schedulerDispatcher.stop();
+      rmDispatcher.stop();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 6c0ed6c..2c0c6d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -66,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -104,7 +106,7 @@ public class TestProportionalCapacityPreemptionPolicy {
   RMContext rmContext = null;
   RMNodeLabelsManager lm = null;
   CapacitySchedulerConfiguration schedConf = null;
-  EventHandler<ContainerPreemptEvent> mDisp = null;
+  EventHandler<SchedulerEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   Resource clusterResources = null;
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -164,6 +166,9 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mCS.getRMContext()).thenReturn(rmContext);
     when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
+    Dispatcher disp = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(disp);
+    when(disp.getEventHandler()).thenReturn(mDisp);
     rand = new Random();
     long seed = rand.nextLong();
     System.out.println(name.getMethodName() + " SEED: " + seed);
@@ -866,12 +871,12 @@ public class TestProportionalCapacityPreemptionPolicy {
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
-    private final ContainerPreemptEventType type;
+    private final SchedulerEventType type;
     IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
       this(appAttId, PREEMPT_CONTAINER);
     }
     IsPreemptionRequestFor(ApplicationAttemptId appAttId,
-        ContainerPreemptEventType type) {
+        SchedulerEventType type) {
       this.appAttId = appAttId;
       this.type = type;
     }
@@ -888,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy =
-      new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+      new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index e13320c..114769c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -50,13 +50,13 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -92,7 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
   private Configuration conf = null;
   private CapacitySchedulerConfiguration csConf = null;
   private CapacityScheduler cs = null;
-  private EventHandler<ContainerPreemptEvent> mDisp = null;
+  private EventHandler<SchedulerEvent> mDisp = null;
   private ProportionalCapacityPreemptionPolicy policy = null;
   private Resource clusterResource = null;
 
@@ -125,11 +126,14 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
 
     rmContext = mock(RMContext.class);
     when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+    Dispatcher disp = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(disp);
+    when(disp.getEventHandler()).thenReturn(mDisp);
     csConf = new CapacitySchedulerConfiguration();
     when(cs.getConfiguration()).thenReturn(csConf);
     when(cs.getRMContext()).thenReturn(rmContext);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
     partitionToResource = new HashMap<>();
     nodeIdToSchedulerNodes = new HashMap<>();
     nameToCSQueues = new HashMap<>();
@@ -828,7 +832,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     when(cs.getClusterResource()).thenReturn(clusterResource);
     mockApplications(appsConfig);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
   }
 
   private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,


[23/24] hadoop git commit: HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu) Moved CHANGES.txt entry to 2.7.2

Posted by ar...@apache.org.
HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync multiple times (Contributed by zhihai xu)
Moved CHANGES.txt entry to 2.7.2


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0febce0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0febce0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0febce0

Branch: refs/heads/HDFS-7240
Commit: e0febce0e74ec69597376774f771da46834c42b1
Parents: c40bdb5
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Jul 7 18:13:35 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Tue Jul 7 18:13:35 2015 +0530

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0febce0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 5d11db9..ee96eee 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -930,9 +930,6 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12164. Fix TestMove and TestFsShellReturnCode failed to get command
     name using reflection. (Lei (Eddy) Xu)
 
-    HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
-    multiple times (zhihai xu via vinayakumarb)
-
     HADOOP-12117. Potential NPE from Configuration#loadProperty with
     allowNullValueProperties set. (zhihai xu via vinayakumarb)
 
@@ -948,6 +945,9 @@ Release 2.7.2 - UNRELEASED
 
   BUG FIXES
 
+    HADOOP-12186. ActiveStandbyElector shouldn't call monitorLockNodeAsync
+    multiple times (zhihai xu via vinayakumarb)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES


[24/24] hadoop git commit: Merge remote-tracking branch 'apache-commit/trunk' into HDFS-7240

Posted by ar...@apache.org.
Merge remote-tracking branch 'apache-commit/trunk' into HDFS-7240


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/83f39a32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/83f39a32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/83f39a32

Branch: refs/heads/HDFS-7240
Commit: 83f39a32c00dc10c3960ceba58210e001b41576d
Parents: 93cebb2 e0febce
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Jul 7 09:00:10 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Jul 7 09:00:10 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  19 +-
 .../org/apache/hadoop/conf/Configuration.java   |   8 +-
 .../apache/hadoop/fs/RawLocalFileSystem.java    |  36 +--
 .../java/org/apache/hadoop/fs/shell/Mkdir.java  |   3 +-
 .../apache/hadoop/ha/ActiveStandbyElector.java  |  20 +-
 .../org/apache/hadoop/ipc/RpcClientUtil.java    |  24 ++
 .../main/java/org/apache/hadoop/ipc/Server.java |   4 +-
 .../org/apache/hadoop/net/NetworkTopology.java  |  62 ++---
 .../org/apache/hadoop/util/StringUtils.java     |   8 +
 .../org/apache/hadoop/net/unix/DomainSocket.c   |   9 +-
 .../apache/hadoop/conf/TestConfiguration.java   |  15 ++
 .../org/apache/hadoop/fs/SymlinkBaseTest.java   |  45 +++-
 .../apache/hadoop/fs/TestLocalFileSystem.java   |  26 ++-
 .../apache/hadoop/fs/TestSymlinkLocalFS.java    |  18 ++
 .../hadoop/fs/shell/TestCopyPreserveFlag.java   |  63 ++---
 .../hadoop/ha/TestActiveStandbyElector.java     |  31 +++
 .../apache/hadoop/net/TestClusterTopology.java  |  75 +++++-
 .../org/apache/hadoop/util/TestStringUtils.java |   4 +
 .../apache/hadoop/hdfs/web/JsonUtilClient.java  |  14 ++
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      |   2 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  17 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   2 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 233 +++++++++++++------
 .../hdfs/server/blockmanagement/BlockInfo.java  |   7 +-
 .../blockmanagement/BlockInfoContiguous.java    |   9 +-
 .../BlockInfoUnderConstruction.java             |  22 +-
 .../BlockInfoUnderConstructionContiguous.java   |  13 +-
 .../server/blockmanagement/BlockManager.java    | 143 ++++++------
 .../hdfs/server/blockmanagement/BlocksMap.java  |   4 +-
 .../ContiguousBlockStorageOp.java               |   7 +-
 .../blockmanagement/CorruptReplicasMap.java     |  62 +++--
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |   6 +-
 .../hadoop/hdfs/server/namenode/FSEditLog.java  |  28 ++-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   2 +-
 .../hdfs/server/namenode/NamenodeFsck.java      |  12 +-
 .../apache/hadoop/hdfs/tools/DebugAdmin.java    |  10 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   6 +-
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 .../blockmanagement/BlockManagerTestUtil.java   |   7 +-
 .../server/blockmanagement/TestBlockInfo.java   |  10 +-
 .../blockmanagement/TestBlockManager.java       |  10 +-
 .../blockmanagement/TestCorruptReplicaInfo.java |  15 +-
 .../hadoop/hdfs/server/mover/TestMover.java     |  19 ++
 .../hdfs/server/namenode/FSXAttrBaseTest.java   |   5 +-
 .../hadoop/hdfs/tools/TestDebugAdmin.java       |   8 +
 .../TestOfflineImageViewerForXAttr.java         |   3 +
 .../org/apache/hadoop/tracing/TestTracing.java  |  18 +-
 hadoop-mapreduce-project/CHANGES.txt            |   5 +-
 .../apache/hadoop/mapred/ShuffleHandler.java    |   3 +-
 .../hadoop/mapred/TestShuffleHandler.java       | 101 ++++++++
 hadoop-yarn-project/CHANGES.txt                 |  20 +-
 .../logaggregation/AggregatedLogFormat.java     |  83 ++++---
 ...TimelineAuthenticationFilterInitializer.java |   5 +-
 .../nodemanager/LinuxContainerExecutor.java     |  12 +-
 .../linux/privileged/PrivilegedOperation.java   |   1 +
 .../privileged/PrivilegedOperationExecutor.java |   2 +-
 .../logaggregation/AppLogAggregatorImpl.java    |   6 +-
 .../util/CgroupsLCEResourcesHandler.java        |   6 +-
 .../container-executor/impl/configuration.c     |   8 +-
 .../test/test-container-executor.c              |   4 +-
 .../TestLinuxContainerExecutorWithMocks.java    |  13 +-
 .../TestLogAggregationService.java              |  36 +++
 .../server/resourcemanager/ResourceManager.java |  34 ---
 .../monitor/SchedulingEditPolicy.java           |   6 +-
 .../monitor/SchedulingMonitor.java              |   3 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  42 ++--
 .../scheduler/ContainerPreemptEvent.java        |   8 +-
 .../scheduler/ContainerPreemptEventType.java    |  26 ---
 .../scheduler/capacity/CapacityScheduler.java   |  24 ++
 .../scheduler/event/SchedulerEventType.java     |   7 +-
 .../scheduler/fair/FSSchedulerNode.java         |  11 +-
 .../resourcemanager/TestRMDispatcher.java       |  79 +++++++
 ...estProportionalCapacityPreemptionPolicy.java |  19 +-
 ...pacityPreemptionPolicyForNodePartitions.java |  12 +-
 74 files changed, 1205 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f39a32/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f39a32/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f39a32/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------


[04/24] hadoop git commit: HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)

Posted by ar...@apache.org.
HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a78d5074
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a78d5074
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a78d5074

Branch: refs/heads/HDFS-7240
Commit: a78d5074fb3da4779a6b5fd9947e60b9d755ee14
Parents: 0e4b066
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Jul 1 17:57:11 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Jul 1 17:57:11 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  2 ++
 .../org/apache/hadoop/ipc/RpcClientUtil.java    | 24 ++++++++++++++++++++
 .../main/java/org/apache/hadoop/ipc/Server.java |  4 +++-
 .../org/apache/hadoop/tracing/TestTracing.java  | 18 +++++++--------
 4 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 39e2e5e..24431ba 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -673,6 +673,8 @@ Release 2.8.0 - UNRELEASED
 
     HADOOP-12124. Add HTrace support for FsShell (cmccabe)
 
+    HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
index d9bd71b..da1e699 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
@@ -210,4 +210,28 @@ public class RpcClientUtil {
     }
     return clazz.getSimpleName() + "#" + method.getName();
   }
+
+  /**
+   * Convert an RPC class method to a string.
+   * The format we want is
+   * 'SecondOutermostClassShortName#OutermostClassShortName'.
+   *
+   * For example, if the full class name is:
+   *   org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations
+   *
+   * the format we want is:
+   *   ClientProtocol#getBlockLocations
+   */
+  public static String toTraceName(String fullName) {
+    int lastPeriod = fullName.lastIndexOf('.');
+    if (lastPeriod < 0) {
+      return fullName;
+    }
+    int secondLastPeriod = fullName.lastIndexOf('.', lastPeriod - 1);
+    if (secondLastPeriod < 0) {
+      return fullName;
+    }
+    return fullName.substring(secondLastPeriod + 1, lastPeriod) + "#" +
+        fullName.substring(lastPeriod + 1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 98fffc0..4026fe0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -1963,7 +1963,9 @@ public abstract class Server {
         // If the incoming RPC included tracing info, always continue the trace
         TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),
                                              header.getTraceInfo().getParentId());
-        traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
+        traceSpan = Trace.startSpan(
+            RpcClientUtil.toTraceName(rpcRequest.toString()),
+            parentSpan).detach();
       }
 
       Call call = new Call(header.getCallId(), header.getRetryCount(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a78d5074/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
index 58b3659..c3d2c73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
@@ -67,18 +67,18 @@ public class TestTracing {
 
     String[] expectedSpanNames = {
       "testWriteTraceHooks",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
+      "ClientProtocol#create",
       "ClientNamenodeProtocol#create",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
+      "ClientProtocol#fsync",
       "ClientNamenodeProtocol#fsync",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
+      "ClientProtocol#complete",
       "ClientNamenodeProtocol#complete",
       "newStreamForCreate",
       "DFSOutputStream#write",
       "DFSOutputStream#close",
       "dataStreamer",
       "OpWriteBlockProto",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
+      "ClientProtocol#addBlock",
       "ClientNamenodeProtocol#addBlock"
     };
     SetSpanReceiver.assertSpanNamesFound(expectedSpanNames);
@@ -95,11 +95,11 @@ public class TestTracing {
     // and children of them are exception.
     String[] spansInTopTrace = {
       "testWriteTraceHooks",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
+      "ClientProtocol#create",
       "ClientNamenodeProtocol#create",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
+      "ClientProtocol#fsync",
       "ClientNamenodeProtocol#fsync",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
+      "ClientProtocol#complete",
       "ClientNamenodeProtocol#complete",
       "newStreamForCreate",
       "DFSOutputStream#write",
@@ -113,7 +113,7 @@ public class TestTracing {
 
     // test for timeline annotation added by HADOOP-11242
     Assert.assertEquals("called",
-        map.get("org.apache.hadoop.hdfs.protocol.ClientProtocol.create")
+        map.get("ClientProtocol#create")
            .get(0).getTimelineAnnotations()
            .get(0).getMessage());
 
@@ -131,7 +131,7 @@ public class TestTracing {
 
     String[] expectedSpanNames = {
       "testReadTraceHooks",
-      "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
+      "ClientProtocol#getBlockLocations",
       "ClientNamenodeProtocol#getBlockLocations",
       "OpReadBlockProto"
     };


[17/24] hadoop git commit: HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. Contributed by Jing Zhao.

Posted by ar...@apache.org.
HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d62b63d2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d62b63d2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d62b63d2

Branch: refs/heads/HDFS-7240
Commit: d62b63d297bff12d93de560dd50ddd48743b851d
Parents: 47a69ec
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jul 6 15:54:07 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Jul 6 15:54:25 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hdfs/server/blockmanagement/BlockInfo.java  |   7 +-
 .../blockmanagement/BlockInfoContiguous.java    |   9 +-
 .../BlockInfoUnderConstruction.java             |  22 ++-
 .../BlockInfoUnderConstructionContiguous.java   |  13 +-
 .../server/blockmanagement/BlockManager.java    | 143 +++++++++----------
 .../hdfs/server/blockmanagement/BlocksMap.java  |   4 +-
 .../ContiguousBlockStorageOp.java               |   7 +-
 .../blockmanagement/CorruptReplicasMap.java     |  62 ++++----
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |   6 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   2 +-
 .../hdfs/server/namenode/NamenodeFsck.java      |  12 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   6 +-
 .../blockmanagement/BlockManagerTestUtil.java   |   7 +-
 .../server/blockmanagement/TestBlockInfo.java   |  10 +-
 .../blockmanagement/TestBlockManager.java       |  10 +-
 .../blockmanagement/TestCorruptReplicaInfo.java |  15 +-
 17 files changed, 169 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9edc2af..d264f74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -701,6 +701,8 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang)
 
+    HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 5ad992b..4df2f0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -179,7 +179,7 @@ public abstract class  BlockInfo extends Block
    *                      information indicating the index of the block in the
    *                      corresponding block group.
    */
-  abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
+  abstract void addStorage(DatanodeStorageInfo storage, Block reportedBlock);
 
   /**
    * Remove {@link DatanodeStorageInfo} location for a block
@@ -193,6 +193,11 @@ public abstract class  BlockInfo extends Block
   abstract void replaceBlock(BlockInfo newBlock);
 
   /**
+   * @return true if there is no storage storing the block
+   */
+  abstract boolean hasEmptyStorage();
+
+  /**
    * Find specified DatanodeStorageInfo.
    * @return DatanodeStorageInfo or null if not found.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index de64ad8..561faca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -45,8 +45,8 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
-    return ContiguousBlockStorageOp.addStorage(this, storage);
+  void addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+    ContiguousBlockStorageOp.addStorage(this, storage);
   }
 
   @Override
@@ -73,4 +73,9 @@ public class BlockInfoContiguous extends BlockInfo {
     ucBlock.setBlockCollection(getBlockCollection());
     return ucBlock;
   }
+
+  @Override
+  boolean hasEmptyStorage() {
+    return ContiguousBlockStorageOp.hasEmptyStorage(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index 9cd3987..7924709 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -274,18 +273,17 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
           "No blocks found, lease removed.");
     }
     boolean allLiveReplicasTriedAsPrimary = true;
-    for (int i = 0; i < replicas.size(); i++) {
+    for (ReplicaUnderConstruction replica : replicas) {
       // Check if all replicas have been tried or not.
-      if (replicas.get(i).isAlive()) {
-        allLiveReplicasTriedAsPrimary =
-            (allLiveReplicasTriedAsPrimary &&
-                replicas.get(i).getChosenAsPrimary());
+      if (replica.isAlive()) {
+        allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary
+            && replica.getChosenAsPrimary();
       }
     }
     if (allLiveReplicasTriedAsPrimary) {
       // Just set all the replicas to be chosen whether they are alive or not.
-      for (int i = 0; i < replicas.size(); i++) {
-        replicas.get(i).setChosenAsPrimary(false);
+      for (ReplicaUnderConstruction replica : replicas) {
+        replica.setChosenAsPrimary(false);
       }
     }
     long mostRecentLastUpdate = 0;
@@ -345,10 +343,6 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
    * Convert an under construction block to a complete block.
    *
    * @return a complete block.
-   * @throws IOException
-   *           if the state of the block (the generation stamp and the length)
-   *           has not been committed by the client or it does not have at
-   *           least a minimal number of replicas reported from data-nodes.
    */
   public abstract BlockInfo convertToCompleteBlock();
 
@@ -386,8 +380,8 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
   }
 
   private void appendUCParts(StringBuilder sb) {
-    sb.append("{UCState=").append(blockUCState)
-      .append(", truncateBlock=" + truncateBlock)
+    sb.append("{UCState=").append(blockUCState).append(", truncateBlock=")
+      .append(truncateBlock)
       .append(", primaryNodeIndex=").append(primaryNodeIndex)
       .append(", replicas=[");
     if (replicas != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
index d3cb337..963f247 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
@@ -55,10 +55,6 @@ public class BlockInfoUnderConstructionContiguous extends
    * Convert an under construction block to a complete block.
    *
    * @return BlockInfo - a complete block.
-   * @throws IOException if the state of the block
-   * (the generation stamp and the length) has not been committed by
-   * the client or it does not have at least a minimal number of replicas
-   * reported from data-nodes.
    */
   @Override
   public BlockInfoContiguous convertToCompleteBlock() {
@@ -69,8 +65,8 @@ public class BlockInfoUnderConstructionContiguous extends
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
-    return ContiguousBlockStorageOp.addStorage(this, storage);
+  void addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+    ContiguousBlockStorageOp.addStorage(this, storage);
   }
 
   @Override
@@ -89,6 +85,11 @@ public class BlockInfoUnderConstructionContiguous extends
   }
 
   @Override
+  boolean hasEmptyStorage() {
+    return ContiguousBlockStorageOp.hasEmptyStorage(this);
+  }
+
+  @Override
   public void setExpectedLocations(DatanodeStorageInfo[] targets) {
     int numLocations = targets == null ? 0 : targets.length;
     this.replicas = new ArrayList<>(numLocations);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0b60a97..6ae3ee2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
@@ -197,7 +196,7 @@ public class BlockManager implements BlockStatsMXBean {
    * notified of all block deletions that might have been pending
    * when the failover happened.
    */
-  private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
+  private final Set<BlockInfo> postponedMisreplicatedBlocks = Sets.newHashSet();
 
   /**
    * Maps a StorageID to the set of blocks that are "extra" for this
@@ -338,8 +337,7 @@ public class BlockManager implements BlockStatsMXBean {
             DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
             DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
     this.shouldCheckForEnoughRacks =
-        conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
-            ? false : true;
+        conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
 
     this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@@ -465,8 +463,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Should the access keys be updated? */
   boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
-    return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
-        : false;
+    return isBlockTokenEnabled() && blockTokenSecretManager.updateKeys(updateTime);
   }
 
   public void activate(Configuration conf) {
@@ -519,14 +516,14 @@ public class BlockManager implements BlockStatsMXBean {
     synchronized (neededReplications) {
       out.println("Metasave: Blocks waiting for replication: " + 
                   neededReplications.size());
-      for (Block block : neededReplications) {
+      for (BlockInfo block : neededReplications) {
         dumpBlockMeta(block, out);
       }
     }
     
     // Dump any postponed over-replicated blocks
     out.println("Mis-replicated blocks that have been postponed:");
-    for (Block block : postponedMisreplicatedBlocks) {
+    for (BlockInfo block : postponedMisreplicatedBlocks) {
       dumpBlockMeta(block, out);
     }
 
@@ -544,11 +541,9 @@ public class BlockManager implements BlockStatsMXBean {
    * Dump the metadata for the given block in a human-readable
    * form.
    */
-  private void dumpBlockMeta(Block block, PrintWriter out) {
-    List<DatanodeDescriptor> containingNodes =
-                                      new ArrayList<DatanodeDescriptor>();
-    List<DatanodeStorageInfo> containingLiveReplicasNodes =
-      new ArrayList<>();
+  private void dumpBlockMeta(BlockInfo block, PrintWriter out) {
+    List<DatanodeDescriptor> containingNodes = new ArrayList<>();
+    List<DatanodeStorageInfo> containingLiveReplicasNodes = new ArrayList<>();
 
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
@@ -556,17 +551,16 @@ public class BlockManager implements BlockStatsMXBean {
         containingLiveReplicasNodes, numReplicas,
         UnderReplicatedBlocks.LEVEL);
     
-    // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
-    // not included in the numReplicas.liveReplicas() count
+    // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which
+    // are not included in the numReplicas.liveReplicas() count
     assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
     int usableReplicas = numReplicas.liveReplicas() +
                          numReplicas.decommissionedAndDecommissioning();
-    
-    if (block instanceof BlockInfo) {
-      BlockCollection bc = ((BlockInfo) block).getBlockCollection();
-      String fileName = (bc == null) ? "[orphaned]" : bc.getName();
-      out.print(fileName + ": ");
-    }
+
+    BlockCollection bc = block.getBlockCollection();
+    String fileName = (bc == null) ? "[orphaned]" : bc.getName();
+    out.print(fileName + ": ");
+
     // l: == live:, d: == decommissioned c: == corrupt e: == excess
     out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
               " (replicas:" +
@@ -575,8 +569,8 @@ public class BlockManager implements BlockStatsMXBean {
               " c: " + numReplicas.corruptReplicas() +
               " e: " + numReplicas.excessReplicas() + ") "); 
 
-    Collection<DatanodeDescriptor> corruptNodes = 
-                                  corruptReplicas.getNodes(block);
+    Collection<DatanodeDescriptor> corruptNodes =
+        corruptReplicas.getNodes(block);
     
     for (DatanodeStorageInfo storage : getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@ -813,7 +807,8 @@ public class BlockManager implements BlockStatsMXBean {
       final long offset, final long length, final int nrBlocksToReturn,
       final AccessMode mode) throws IOException {
     int curBlk;
-    long curPos = 0, blkSize = 0;
+    long curPos = 0;
+    long blkSize;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
       blkSize = blocks[curBlk].getNumBytes();
@@ -1204,10 +1199,11 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * 
-   * @param b
+   * Mark a replica (of a contiguous block) or an internal block (of a striped
+   * block group) as corrupt.
+   * @param b Indicating the reported bad block and the corresponding BlockInfo
+   *          stored in blocksMap.
    * @param storageInfo storage that contains the block, if known. null otherwise.
-   * @throws IOException
    */
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
       DatanodeStorageInfo storageInfo,
@@ -1228,7 +1224,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+    corruptReplicas.addToCorruptReplicasMap(b.stored, node, b.reason,
         b.reasonCode);
 
     NumberReplicas numberOfReplicas = countNodes(b.stored);
@@ -1250,7 +1246,7 @@ public class BlockManager implements BlockStatsMXBean {
     if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
         || corruptedDuringWrite) {
       // the block is over-replicated so invalidate the replicas immediately
-      invalidateBlock(b, node);
+      invalidateBlock(b, node, numberOfReplicas);
     } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
       updateNeededReplications(b.stored, -1, 0);
@@ -1258,12 +1254,15 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Invalidates the given block on the given datanode.
-   * @return true if the block was successfully invalidated and no longer
-   * present in the BlocksMap
+   * Invalidates the given block on the given datanode. Note that before this
+   * call we have already checked the current live replicas of the block and
+   * make sure it's safe to invalidate the replica.
+   *
+   * @return true if the replica was successfully invalidated and no longer
+   *         associated with the DataNode.
    */
-  private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
-      ) throws IOException {
+  private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
+      NumberReplicas nr) throws IOException {
     blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn);
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -1272,35 +1271,30 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Check how many copies we have of the block
-    NumberReplicas nr = countNodes(b.stored);
     if (nr.replicasOnStaleNodes() > 0) {
       blockLog.info("BLOCK* invalidateBlocks: postponing " +
           "invalidation of {} on {} because {} replica(s) are located on " +
           "nodes with potentially out-of-date block reports", b, dn,
           nr.replicasOnStaleNodes());
-      postponeBlock(b.corrupted);
+      postponeBlock(b.stored);
       return false;
-    } else if (nr.liveReplicas() >= 1) {
-      // If we have at least one copy on a live node, then we can delete it.
+    } else {
+      // we already checked the number of replicas in the caller of this
+      // function and we know there is at least one copy on a live node, so we
+      // can delete it.
       addToInvalidates(b.corrupted, dn);
       removeStoredBlock(b.stored, node);
       blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
           b, dn);
       return true;
-    } else {
-      blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and" +
-          " was not deleted", b, dn);
-      return false;
     }
   }
 
-
   public void setPostponeBlocksFromFuture(boolean postpone) {
     this.shouldPostponeBlocksFromFuture  = postpone;
   }
 
-
-  private void postponeBlock(Block blk) {
+  private void postponeBlock(BlockInfo blk) {
     if (postponedMisreplicatedBlocks.add(blk)) {
       postponedMisreplicatedBlocksCount.incrementAndGet();
     }
@@ -1374,7 +1368,7 @@ public class BlockManager implements BlockStatsMXBean {
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
-    BlockCollection bc = null;
+    BlockCollection bc;
     int additionalReplRequired;
 
     int scheduledWork = 0;
@@ -1535,9 +1529,9 @@ public class BlockManager implements BlockStatsMXBean {
         DatanodeStorageInfo[] targets = rw.targets;
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
-          for (int k = 0; k < targets.length; k++) {
+          for (DatanodeStorageInfo target : targets) {
             targetList.append(' ');
-            targetList.append(targets[k].getDatanodeDescriptor());
+            targetList.append(target.getDatanodeDescriptor());
           }
           blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
               rw.block, targetList);
@@ -1614,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
     List<DatanodeDescriptor> datanodeDescriptors = null;
     if (nodes != null) {
       datanodeDescriptors = new ArrayList<>(nodes.size());
-      for (int i = 0; i < nodes.size(); i++) {
-        DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
+      for (String nodeStr : nodes) {
+        DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodeStr);
         if (node != null) {
           datanodeDescriptors.add(node);
         }
@@ -1654,7 +1648,7 @@ public class BlockManager implements BlockStatsMXBean {
    *         the given block
    */
    @VisibleForTesting
-   DatanodeDescriptor chooseSourceDatanode(Block block,
+   DatanodeDescriptor chooseSourceDatanode(BlockInfo block,
        List<DatanodeDescriptor> containingNodes,
        List<DatanodeStorageInfo>  nodesContainingLiveReplicas,
        NumberReplicas numReplicas,
@@ -1734,16 +1728,16 @@ public class BlockManager implements BlockStatsMXBean {
     if (timedOutItems != null) {
       namesystem.writeLock();
       try {
-        for (int i = 0; i < timedOutItems.length; i++) {
+        for (BlockInfo timedOutItem : timedOutItems) {
           /*
            * Use the blockinfo from the blocksmap to be certain we're working
            * with the most up-to-date block information (e.g. genstamp).
            */
-          BlockInfo bi = getStoredBlock(timedOutItems[i]);
+          BlockInfo bi = getStoredBlock(timedOutItem);
           if (bi == null) {
             continue;
           }
-          NumberReplicas num = countNodes(timedOutItems[i]);
+          NumberReplicas num = countNodes(timedOutItem);
           if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) {
             neededReplications.add(bi, num.liveReplicas(),
                 num.decommissionedAndDecommissioning(), getReplication(bi));
@@ -1760,7 +1754,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
     assert namesystem.hasReadLock();
-    DatanodeDescriptor node = null;
+    DatanodeDescriptor node;
     try {
       node = datanodeManager.getDatanode(nodeReg);
     } catch (UnregisteredNodeException e) {
@@ -2022,7 +2016,7 @@ public class BlockManager implements BlockStatsMXBean {
           startIndex += (base+1);
         }
       }
-      Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
+      Iterator<BlockInfo> it = postponedMisreplicatedBlocks.iterator();
       for (int tmp = 0; tmp < startIndex; tmp++) {
         it.next();
       }
@@ -2117,7 +2111,7 @@ public class BlockManager implements BlockStatsMXBean {
       long oldGenerationStamp, long oldNumBytes,
       DatanodeStorageInfo[] newStorages) throws IOException {
     assert namesystem.hasWriteLock();
-    BlockToMarkCorrupt b = null;
+    BlockToMarkCorrupt b;
     if (block.getGenerationStamp() != oldGenerationStamp) {
       b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
           "genstamp does not match " + oldGenerationStamp
@@ -2719,7 +2713,7 @@ public class BlockManager implements BlockStatsMXBean {
           " but corrupt replicas map has " + corruptReplicasCount);
     }
     if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
-      invalidateCorruptReplicas(storedBlock, reportedBlock);
+      invalidateCorruptReplicas(storedBlock, reportedBlock, num);
     }
     return storedBlock;
   }
@@ -2752,18 +2746,20 @@ public class BlockManager implements BlockStatsMXBean {
    *
    * @param blk Block whose corrupt replicas need to be invalidated
    */
-  private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
+  private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
+      NumberReplicas numberReplicas) {
     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
     boolean removedFromBlocksMap = true;
     if (nodes == null)
       return;
     // make a copy of the array of nodes in order to avoid
     // ConcurrentModificationException, when the block is removed from the node
-    DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
+    DatanodeDescriptor[] nodesCopy = nodes.toArray(
+        new DatanodeDescriptor[nodes.size()]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
         if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
-            Reason.ANY), node)) {
+            Reason.ANY), node, numberReplicas)) {
           removedFromBlocksMap = false;
         }
       } catch (IOException e) {
@@ -2813,7 +2809,6 @@ public class BlockManager implements BlockStatsMXBean {
         replicationQueuesInitializer.join();
       } catch (final InterruptedException e) {
         LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
-        return;
       } finally {
         replicationQueuesInitializer = null;
       }
@@ -3175,8 +3170,7 @@ public class BlockManager implements BlockStatsMXBean {
       CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
           .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
       if (cblock != null) {
-        boolean removed = false;
-        removed |= node.getPendingCached().remove(cblock);
+        boolean removed = node.getPendingCached().remove(cblock);
         removed |= node.getCached().remove(cblock);
         removed |= node.getPendingUncached().remove(cblock);
         if (removed) {
@@ -3392,7 +3386,7 @@ public class BlockManager implements BlockStatsMXBean {
     int excess = 0;
     int stale = 0;
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+    for (DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
         corrupt++;
@@ -3413,7 +3407,8 @@ public class BlockManager implements BlockStatsMXBean {
         stale++;
       }
     }
-    return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
+    return new NumberReplicas(live, decommissioned, decommissioning, corrupt,
+        excess, stale);
   }
 
   /** 
@@ -3596,8 +3591,6 @@ public class BlockManager implements BlockStatsMXBean {
       String src, BlockInfo[] blocks) {
     for (BlockInfo b: blocks) {
       if (!b.isComplete()) {
-        final BlockInfoUnderConstruction uc =
-            (BlockInfoUnderConstruction)b;
         final int numNodes = b.numNodes();
         final int min = getMinStorageNum(b);
         final BlockUCState state = b.getBlockUCState();
@@ -3723,11 +3716,7 @@ public class BlockManager implements BlockStatsMXBean {
     return blocksMap.getBlockCollection(b);
   }
 
-  public int numCorruptReplicas(Block block) {
-    return corruptReplicas.numCorruptReplicas(block);
-  }
-
-  public void removeBlockFromMap(Block block) {
+  public void removeBlockFromMap(BlockInfo block) {
     removeFromExcessReplicateMap(block);
     blocksMap.removeBlock(block);
     // If block is removed from blocksMap remove it from corruptReplicasMap
@@ -3737,7 +3726,7 @@ public class BlockManager implements BlockStatsMXBean {
   /**
    * If a block is removed from blocksMap, remove it from excessReplicateMap.
    */
-  private void removeFromExcessReplicateMap(Block block) {
+  private void removeFromExcessReplicateMap(BlockInfo block) {
     for (DatanodeStorageInfo info : getStorages(block)) {
       String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
       LightWeightLinkedSet<BlockInfo> excessReplicas =
@@ -3768,14 +3757,14 @@ public class BlockManager implements BlockStatsMXBean {
   /**
    * Get the replicas which are corrupt for a given block.
    */
-  public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+  public Collection<DatanodeDescriptor> getCorruptReplicas(BlockInfo block) {
     return corruptReplicas.getNodes(block);
   }
 
  /**
   * Get reason for certain corrupted replicas for a given block and a given dn.
   */
- public String getCorruptReason(Block block, DatanodeDescriptor node) {
+ public String getCorruptReason(BlockInfo block, DatanodeDescriptor node) {
    return corruptReplicas.getCorruptReason(block, node);
  }
 
@@ -3869,7 +3858,7 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.clearPendingQueues();
     postponedMisreplicatedBlocks.clear();
     postponedMisreplicatedBlocksCount.set(0);
-  };
+  }
 
   public static LocatedBlock newLocatedBlock(
       ExtendedBlock b, DatanodeStorageInfo[] storages,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 0dbf485..85cea5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -117,7 +117,7 @@ class BlocksMap {
    * remove it from all data-node lists it belongs to;
    * and remove all data-node locations associated with the block.
    */
-  void removeBlock(Block block) {
+  void removeBlock(BlockInfo block) {
     BlockInfo blockInfo = blocks.remove(block);
     if (blockInfo == null)
       return;
@@ -190,7 +190,7 @@ class BlocksMap {
     // remove block from the data-node list and the node from the block info
     boolean removed = node.removeBlock(info);
 
-    if (info.getDatanode(0) == null     // no datanodes left
+    if (info.hasEmptyStorage()     // no datanodes left
               && info.isDeleted()) {  // does not belong to a file
       blocks.remove(b);  // remove block from the map
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
index 092f65e..70251e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ContiguousBlockStorageOp.java
@@ -45,13 +45,12 @@ class ContiguousBlockStorageOp {
     return last;
   }
 
-  static boolean addStorage(BlockInfo b, DatanodeStorageInfo storage) {
+  static void addStorage(BlockInfo b, DatanodeStorageInfo storage) {
     // find the last null node
     int lastNode = ensureCapacity(b, 1);
     b.setStorageInfo(lastNode, storage);
     b.setNext(lastNode, null);
     b.setPrevious(lastNode, null);
-    return true;
   }
 
   static boolean removeStorage(BlockInfo b,
@@ -103,4 +102,8 @@ class ContiguousBlockStorageOp {
           "newBlock already exists.");
     }
   }
+
+  static boolean hasEmptyStorage(BlockInfo b) {
+    return b.getStorageInfo(0) == null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
index fc2e234..9a0023d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.Server;
 
@@ -46,8 +46,12 @@ public class CorruptReplicasMap{
     CORRUPTION_REPORTED  // client or datanode reported the corruption
   }
 
-  private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
-    new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
+  /**
+   * Used to track corrupted replicas (for contiguous block) or internal blocks
+   * (for striped block) and the corresponding DataNodes. For a striped block,
+   * the key here is the striped block group object stored in the blocksMap.
+   */
+  private final SortedMap<BlockInfo, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<>();
 
   /**
    * Mark the block belonging to datanode as corrupt.
@@ -57,21 +61,21 @@ public class CorruptReplicasMap{
    * @param reason a textual reason (for logging purposes)
    * @param reasonCode the enum representation of the reason
    */
-  void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+  void addToCorruptReplicasMap(BlockInfo blk, DatanodeDescriptor dn,
       String reason, Reason reasonCode) {
     Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
     if (nodes == null) {
-      nodes = new HashMap<DatanodeDescriptor, Reason>();
+      nodes = new HashMap<>();
       corruptReplicasMap.put(blk, nodes);
     }
-    
+
     String reasonText;
     if (reason != null) {
       reasonText = " because " + reason;
     } else {
       reasonText = "";
     }
-    
+
     if (!nodes.keySet().contains(dn)) {
       NameNode.blockStateChangeLog.info(
           "BLOCK NameSystem.addToCorruptReplicasMap: {} added as corrupt on "
@@ -92,7 +96,7 @@ public class CorruptReplicasMap{
    *
    * @param blk Block to be removed
    */
-  void removeFromCorruptReplicasMap(Block blk) {
+  void removeFromCorruptReplicasMap(BlockInfo blk) {
     if (corruptReplicasMap != null) {
       corruptReplicasMap.remove(blk);
     }
@@ -105,12 +109,13 @@ public class CorruptReplicasMap{
    * @return true if the removal is successful; 
              false if the replica is not in the map
    */ 
-  boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
+  boolean removeFromCorruptReplicasMap(BlockInfo blk,
+      DatanodeDescriptor datanode) {
     return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
   }
 
-  boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
-      Reason reason) {
+  boolean removeFromCorruptReplicasMap(BlockInfo blk,
+      DatanodeDescriptor datanode, Reason reason) {
     Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
     if (datanodes==null)
       return false;
@@ -139,11 +144,9 @@ public class CorruptReplicasMap{
    * @param blk Block for which nodes are requested
    * @return collection of nodes. Null if does not exists
    */
-  Collection<DatanodeDescriptor> getNodes(Block blk) {
-    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
-    if (nodes == null)
-      return null;
-    return nodes.keySet();
+  Collection<DatanodeDescriptor> getNodes(BlockInfo blk) {
+    Map<DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
+    return nodes != null ? nodes.keySet() : null;
   }
 
   /**
@@ -153,12 +156,12 @@ public class CorruptReplicasMap{
    * @param node DatanodeDescriptor which holds the replica
    * @return true if replica is corrupt, false if does not exists in this map
    */
-  boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
+  boolean isReplicaCorrupt(BlockInfo blk, DatanodeDescriptor node) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
     return ((nodes != null) && (nodes.contains(node)));
   }
 
-  int numCorruptReplicas(Block blk) {
+  int numCorruptReplicas(BlockInfo blk) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
     return (nodes == null) ? 0 : nodes.size();
   }
@@ -168,9 +171,9 @@ public class CorruptReplicasMap{
   }
 
   /**
-   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
+   * Return a range of corrupt replica block ids. Up to numExpectedBlocks
    * blocks starting at the next block after startingBlockId are returned
-   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
+   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
    * is null, up to numExpectedBlocks blocks are returned from the beginning.
    * If startingBlockId cannot be found, null is returned.
    *
@@ -181,44 +184,39 @@ public class CorruptReplicasMap{
    * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
    *
    */
+  @VisibleForTesting
   long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
                                    Long startingBlockId) {
     if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
       return null;
     }
-    
-    Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
-    
+    Iterator<BlockInfo> blockIt = corruptReplicasMap.keySet().iterator();
     // if the starting block id was specified, iterate over keys until
     // we find the matching block. If we find a matching block, break
-    // to leave the iterator on the next block after the specified block. 
+    // to leave the iterator on the next block after the specified block.
     if (startingBlockId != null) {
       boolean isBlockFound = false;
       while (blockIt.hasNext()) {
-        Block b = blockIt.next();
+        BlockInfo b = blockIt.next();
         if (b.getBlockId() == startingBlockId) {
           isBlockFound = true;
-          break; 
+          break;
         }
       }
-      
       if (!isBlockFound) {
         return null;
       }
     }
 
-    ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
-
+    ArrayList<Long> corruptReplicaBlockIds = new ArrayList<>();
     // append up to numExpectedBlocks blockIds to our list
     for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
       corruptReplicaBlockIds.add(blockIt.next().getBlockId());
     }
-    
     long[] ret = new long[corruptReplicaBlockIds.size()];
     for(int i=0; i<ret.length; i++) {
       ret[i] = corruptReplicaBlockIds.get(i);
     }
-    
     return ret;
   }
 
@@ -229,7 +227,7 @@ public class CorruptReplicasMap{
    * @param node datanode that contains this corrupted replica
    * @return reason
    */
-  String getCorruptReason(Block block, DatanodeDescriptor node) {
+  String getCorruptReason(BlockInfo block, DatanodeDescriptor node) {
     Reason reason = null;
     if(corruptReplicasMap.containsKey(block)) {
       if (corruptReplicasMap.get(block).containsKey(node)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 4830d5d..eebeac0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -71,7 +71,7 @@ class FSDirWriteFileOp {
   private FSDirWriteFileOp() {}
   static boolean unprotectedRemoveBlock(
       FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
-      Block block) throws IOException {
+      BlockInfo block) throws IOException {
     // modify file-> block and blocksMap
     // fileNode should be under construction
     BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
@@ -136,7 +136,9 @@ class FSDirWriteFileOp {
     fsd.writeLock();
     try {
       // Remove the block from the pending creates list
-      if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
+      BlockInfo storedBlock = fsd.getBlockManager().getStoredBlock(localBlock);
+      if (storedBlock != null &&
+          !unprotectedRemoveBlock(fsd, src, iip, file, storedBlock)) {
         return;
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 63ef985..96d6982 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -1035,7 +1035,7 @@ public class FSEditLogLoader {
         throw new IOException("Trying to remove more than one block from file "
             + path);
       }
-      Block oldBlock = oldBlocks[oldBlocks.length - 1];
+      BlockInfo oldBlock = oldBlocks[oldBlocks.length - 1];
       boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
           fsDir, path, iip, file, oldBlock);
       if (!removed && !(op instanceof UpdateBlocksOp)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index ab179b4..2a8231a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -267,10 +267,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       out.println("No. of corrupted Replica: " +
           numberReplicas.corruptReplicas());
       //record datanodes that have corrupted block replica
-      Collection<DatanodeDescriptor> corruptionRecord = null;
-      if (bm.getCorruptReplicas(block) != null) {
-        corruptionRecord = bm.getCorruptReplicas(block);
-      }
+      Collection<DatanodeDescriptor> corruptionRecord =
+          bm.getCorruptReplicas(blockInfo);
 
       //report block replicas status on datanodes
       for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
@@ -279,7 +277,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
             dn.getNetworkLocation() + " ");
         if (corruptionRecord != null && corruptionRecord.contains(dn)) {
           out.print(CORRUPT_STATUS+"\t ReasonCode: "+
-            bm.getCorruptReason(block,dn));
+            bm.getCorruptReason(blockInfo, dn));
         } else if (dn.isDecommissioned() ){
           out.print(DECOMMISSIONED_STATUS);
         } else if (dn.isDecommissionInProgress()) {
@@ -650,7 +648,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
               LightWeightLinkedSet<BlockInfo> blocksExcess =
                   bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
               Collection<DatanodeDescriptor> corruptReplicas =
-                  bm.getCorruptReplicas(block.getLocalBlock());
+                  bm.getCorruptReplicas(storedBlock);
               sb.append("(");
               if (dnDesc.isDecommissioned()) {
                 sb.append("DECOMMISSIONED)");
@@ -658,7 +656,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
                 sb.append("DECOMMISSIONING)");
               } else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
                 sb.append("CORRUPT)");
-              } else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
+              } else if (blocksExcess != null && blocksExcess.contains(storedBlock)) {
                 sb.append("EXCESS)");
               } else if (dnDesc.isStale(this.staleInterval)) {
                 sb.append("STALE_NODE)");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 89ee674..af1e023 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -560,7 +560,8 @@ public class DFSTestUtil {
       throws TimeoutException, InterruptedException {
     int count = 0;
     final int ATTEMPTS = 50;
-    int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
+    int repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(),
+        b.getLocalBlock());
     while (repls != corruptRepls && count < ATTEMPTS) {
       try {
         IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@@ -572,7 +573,8 @@ public class DFSTestUtil {
       count++;
       // check more often so corrupt block reports are not easily missed
       for (int i = 0; i < 10; i++) {
-        repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
+        repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(),
+            b.getLocalBlock());
         Thread.sleep(100);
         if (repls == corruptRepls) {
           break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 148135b..a899891 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -87,7 +88,7 @@ public class BlockManagerTestUtil {
       final Block b) {
     final Set<String> rackSet = new HashSet<String>(0);
     final Collection<DatanodeDescriptor> corruptNodes = 
-       getCorruptReplicas(blockManager).getNodes(b);
+       getCorruptReplicas(blockManager).getNodes(blockManager.getStoredBlock(b));
     for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@@ -306,4 +307,8 @@ public class BlockManagerTestUtil {
       throws ExecutionException, InterruptedException {
     dm.getDecomManager().runMonitor();
   }
+
+  public static int numCorruptReplicas(BlockManager bm, Block block) {
+    return bm.corruptReplicas.numCorruptReplicas(bm.getStoredBlock(block));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index bae4f1d..c23f3d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -63,9 +63,7 @@ public class TestBlockInfo {
 
     final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
 
-    boolean added = blockInfo.addStorage(storage, blockInfo);
-
-    Assert.assertTrue(added);
+    blockInfo.addStorage(storage, blockInfo);
     Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
   }
 
@@ -73,7 +71,7 @@ public class TestBlockInfo {
   public void testCopyConstructor() {
     BlockInfo old = new BlockInfoContiguous((short) 3);
     try {
-      BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old);
+      BlockInfo copy = new BlockInfoContiguous(old);
       assertEquals(old.getBlockCollection(), copy.getBlockCollection());
       assertEquals(old.getCapacity(), copy.getCapacity());
     } catch (Exception e) {
@@ -110,8 +108,8 @@ public class TestBlockInfo {
     final int MAX_BLOCKS = 10;
 
     DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
-    ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
-    ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
+    ArrayList<Block> blockList = new ArrayList<>(MAX_BLOCKS);
+    ArrayList<BlockInfo> blockInfoList = new ArrayList<>();
     int headIndex;
     int curIndex;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 9e31670..f6cc747 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -509,7 +509,7 @@ public class TestBlockManager {
         + " even if all available source nodes have reached their replication"
         + " limits below the hard limit.",
         bm.chooseSourceDatanode(
-            aBlock,
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
@@ -519,7 +519,7 @@ public class TestBlockManager {
         + " replication since all available source nodes have reached"
         + " their replication limits.",
         bm.chooseSourceDatanode(
-            aBlock,
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
@@ -532,7 +532,7 @@ public class TestBlockManager {
     assertNull("Does not choose a source node for a highest-priority"
         + " replication when all available nodes exceed the hard limit.",
         bm.chooseSourceDatanode(
-            aBlock,
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
@@ -558,7 +558,7 @@ public class TestBlockManager {
         + " if all available source nodes have reached their replication"
         + " limits below the hard limit.",
         bm.chooseSourceDatanode(
-            aBlock,
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
@@ -572,7 +572,7 @@ public class TestBlockManager {
     assertNull("Does not choose a source decommissioning node for a normal"
         + " replication when all available nodes exceed the hard limit.",
         bm.chooseSourceDatanode(
-            aBlock,
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b63d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
index 21fb54e..1a49bee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
@@ -48,20 +48,19 @@ public class TestCorruptReplicaInfo {
   private static final Log LOG = 
                            LogFactory.getLog(TestCorruptReplicaInfo.class);
   
-  private final Map<Long, Block> block_map =
-    new HashMap<Long, Block>();  
+  private final Map<Long, BlockInfo> block_map = new HashMap<>();
     
   // Allow easy block creation by block id
   // Return existing block if one with same block id already exists
-  private Block getBlock(Long block_id) {
+  private BlockInfo getBlock(Long block_id) {
     if (!block_map.containsKey(block_id)) {
-      block_map.put(block_id, new Block(block_id,0,0));
+      block_map.put(block_id,
+          new BlockInfoContiguous(new Block(block_id, 0, 0), (short) 1));
     }
-    
     return block_map.get(block_id);
   }
   
-  private Block getBlock(int block_id) {
+  private BlockInfo getBlock(int block_id) {
     return getBlock((long)block_id);
   }
   
@@ -82,7 +81,7 @@ public class TestCorruptReplicaInfo {
       // create a list of block_ids. A list is used to allow easy validation of the
       // output of getCorruptReplicaBlockIds
       int NUM_BLOCK_IDS = 140;
-      List<Long> block_ids = new LinkedList<Long>();
+      List<Long> block_ids = new LinkedList<>();
       for (int i=0;i<NUM_BLOCK_IDS;i++) {
         block_ids.add((long)i);
       }
@@ -130,7 +129,7 @@ public class TestCorruptReplicaInfo {
   }
   
   private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
-      Block blk, DatanodeDescriptor dn) {
+      BlockInfo blk, DatanodeDescriptor dn) {
     crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE);
   }
 }


[12/24] hadoop git commit: MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.

Posted by ar...@apache.org.
MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to
getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff67dfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff67dfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff67dfe

Branch: refs/heads/HDFS-7240
Commit: bff67dfe2f811654ffb1bbcbd87509c185f452b6
Parents: 688617d
Author: Devaraj K <de...@apache.org>
Authored: Mon Jul 6 13:46:37 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Mon Jul 6 13:46:37 2015 +0530

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../apache/hadoop/mapred/ShuffleHandler.java    |   3 +-
 .../hadoop/mapred/TestShuffleHandler.java       | 101 +++++++++++++++++++
 3 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2f80615..2458403 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -542,6 +542,9 @@ Release 2.7.2 - UNRELEASED
 
   BUG FIXES
 
+    MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
+    if mapId is not in the cache. (zhihai xu via devaraj)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index eedf42b..ee1be23 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -815,7 +815,8 @@ public class ShuffleHandler extends AuxiliaryService {
         try {
           MapOutputInfo info = mapOutputInfoMap.get(mapId);
           if (info == null) {
-            info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+            info = getMapOutputInfo(outputBasePathStr + mapId,
+                mapId, reduceId, user);
           }
           lastMap =
               sendMapOutput(ctx, ch, user, mapId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 7053653..746071f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -601,6 +601,7 @@ public class TestShuffleHandler {
       Assert.assertTrue((new String(byteArr)).contains(message));
     } finally {
       shuffleHandler.stop();
+      FileUtil.fullyDelete(absLogDir);
     }
   }
 
@@ -829,4 +830,104 @@ public class TestShuffleHandler {
     conn.disconnect();
     return rc;
   }
+
+  @Test(timeout = 100000)
+  public void testGetMapOutputInfo() throws Exception {
+    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    File absLogDir = new File("target", TestShuffleHandler.class.
+        getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    String appAttemptId = "attempt_12345_1_m_1_0";
+    String user = "randomUser";
+    String reducerId = "0";
+    List<File> fileMap = new ArrayList<File>();
+    createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+        conf, fileMap);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void populateHeaders(List<String> mapIds,
+              String outputBaseStr, String user, int reduce,
+              HttpRequest request, HttpResponse response,
+              boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
+              throws IOException {
+            // Only set response headers and skip everything else
+            // send some dummy value for content-length
+            super.setResponseHeaders(response, keepAliveParam, 100);
+          }
+          @Override
+          protected void verifyRequest(String appid,
+              ChannelHandlerContext ctx, HttpRequest request,
+              HttpResponse response, URL requestUri) throws IOException {
+            // Do nothing.
+          }
+          @Override
+          protected void sendError(ChannelHandlerContext ctx, String message,
+              HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error(message));
+              ctx.getChannel().close();
+            }
+          }
+          @Override
+          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+              Channel ch, String user, String mapId, int reduce,
+              MapOutputInfo info) throws IOException {
+            // send a shuffle header
+            ShuffleHeader header =
+                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+            DataOutputBuffer dob = new DataOutputBuffer();
+            header.write(dob);
+            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt =
+          new Token<JobTokenIdentifier>("identifier".getBytes(),
+          "password".getBytes(), new Text(user), new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler
+          .initializeApplication(new ApplicationInitializationContext(user,
+          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+          outputBuffer.getLength())));
+      URL url =
+          new URL(
+              "http://127.0.0.1:"
+                  + shuffleHandler.getConfig().get(
+                      ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+                  + "&map=attempt_12345_1_m_1_0");
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      conn.connect();
+      try {
+        DataInputStream is = new DataInputStream(conn.getInputStream());
+        ShuffleHeader header = new ShuffleHeader();
+        header.readFields(is);
+        is.close();
+      } catch (EOFException e) {
+        // ignore
+      }
+      Assert.assertEquals(failures.size(), 0);
+    } finally {
+      shuffleHandler.stop();
+      FileUtil.fullyDelete(absLogDir);
+    }
+  }
 }


[05/24] hadoop git commit: HADOOP-12172. FsShell mkdir -p makes an unnecessary check for the existence of the parent. Contributed by Chris Nauroth.

Posted by ar...@apache.org.
HADOOP-12172. FsShell mkdir -p makes an unnecessary check for the existence of the parent. Contributed by Chris Nauroth.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f3796224
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f3796224
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f3796224

Branch: refs/heads/HDFS-7240
Commit: f3796224bfdfd88e2428cc8a9915bdfdc62b48f3
Parents: a78d507
Author: cnauroth <cn...@apache.org>
Authored: Wed Jul 1 19:47:58 2015 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Wed Jul 1 19:47:58 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt                   | 3 +++
 .../src/main/java/org/apache/hadoop/fs/shell/Mkdir.java           | 3 ++-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3796224/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 24431ba..312a996 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -704,6 +704,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12112. Make hadoop-common-project Native code -Wall-clean
     (alanburlison via cmccabe)
 
+    HADOOP-12172. FsShell mkdir -p makes an unnecessary check for the existence
+    of the parent. (cnauroth)
+
   BUG FIXES
 
     HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3796224/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
index 74bad62..9f39da2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Mkdir.java
@@ -70,7 +70,8 @@ class Mkdir extends FsCommand {
   protected void processNonexistentPath(PathData item) throws IOException {
     // check if parent exists. this is complicated because getParent(a/b/c/) returns a/b/c, but
     // we want a/b
-    if (!item.fs.exists(new Path(item.path.toString()).getParent()) && !createParents) {
+    if (!createParents &&
+        !item.fs.exists(new Path(item.path.toString()).getParent())) {
       throw new PathNotFoundException(item.toString());
     }
     if (!item.fs.mkdirs(item.path)) {


[14/24] hadoop git commit: HDFS-8686. WebHdfsFileSystem#getXAttr(Path p, final String name) doesn't work if namespace is in capitals (Contributed by kanaka kumar avvaru)

Posted by ar...@apache.org.
HDFS-8686. WebHdfsFileSystem#getXAttr(Path p, final String name) doesn't work if namespace is in capitals (Contributed by kanaka kumar avvaru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc92d3e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc92d3e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc92d3e6

Branch: refs/heads/HDFS-7240
Commit: fc92d3e6515a391847cb6170244b3d911712d96a
Parents: 233cab8
Author: Vinayakumar B <vi...@apache.org>
Authored: Mon Jul 6 16:09:24 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Mon Jul 6 16:09:24 2015 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/web/JsonUtilClient.java    | 14 ++++++++++++++
 .../org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java |  2 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt           |  3 +++
 .../hadoop/hdfs/server/namenode/FSXAttrBaseTest.java  |  5 ++++-
 .../TestOfflineImageViewerForXAttr.java               |  3 +++
 5 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
index e025e31..713836c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
@@ -413,6 +413,20 @@ class JsonUtilClient {
     return null;
   }
 
+  /** Expecting only single XAttr in the map. return its value */
+  static byte[] getXAttr(final Map<?, ?> json) throws IOException {
+    if (json == null) {
+      return null;
+    }
+
+    Map<String, byte[]> xAttrs = toXAttrs(json);
+    if (xAttrs != null && !xAttrs.values().isEmpty()) {
+      return xAttrs.values().iterator().next();
+    }
+
+    return null;
+  }
+
   static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
       throws IOException {
     if (json == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 2650dca..b661d07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -963,7 +963,7 @@ public class WebHdfsFileSystem extends FileSystem
         new XAttrEncodingParam(XAttrCodec.HEX)) {
       @Override
       byte[] decodeResponse(Map<?, ?> json) throws IOException {
-        return JsonUtilClient.getXAttr(json, name);
+        return JsonUtilClient.getXAttr(json);
       }
     }.run();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4f184fb..9edc2af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1005,6 +1005,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8577. Avoid retrying to recover lease on a file which does not exist
     (J.Andreina via vinayakumarb)
 
+    HDFS-8686. WebHdfsFileSystem#getXAttr(Path p, final String name) doesn't
+    work if namespace is in capitals (kanaka kumar avvaru via vinayakumarb)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
index e21e34c..eb9053c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
@@ -395,7 +395,10 @@ public class FSXAttrBaseTest {
     FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
     fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
     fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
-    
+
+    final byte[] theValue = fs.getXAttr(path, "USER.a2");
+    Assert.assertArrayEquals(value2, theValue);
+
     /* An XAttr that was requested does not exist. */
     try {
       final byte[] value = fs.getXAttr(path, name3);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc92d3e6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
index 3f23f64..6c82101 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForXAttr.java
@@ -231,6 +231,9 @@ public class TestOfflineImageViewerForXAttr {
           "user.attr1"));
       assertEquals("value1", value);
 
+      value = new String(webhdfs.getXAttr(new Path("/dir1"), "USER.attr1"));
+      assertEquals("value1", value);
+
       Map<String, byte[]> contentMap = webhdfs.getXAttrs(new Path("/dir1"),
           names);
 


[15/24] hadoop git commit: HADOOP-12045. Enable LocalFileSystem#setTimes to change atime. Contributed by Kazuho Fujii.

Posted by ar...@apache.org.
HADOOP-12045. Enable LocalFileSystem#setTimes to change atime. Contributed by Kazuho Fujii.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed1e3ce4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed1e3ce4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed1e3ce4

Branch: refs/heads/HDFS-7240
Commit: ed1e3ce482f679ae2fad43a203f6578d7af59327
Parents: fc92d3e
Author: cnauroth <cn...@apache.org>
Authored: Mon Jul 6 13:40:15 2015 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Mon Jul 6 13:40:15 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../apache/hadoop/fs/RawLocalFileSystem.java    | 36 ++++++-----
 .../org/apache/hadoop/fs/SymlinkBaseTest.java   | 45 +++++++++++---
 .../apache/hadoop/fs/TestLocalFileSystem.java   | 26 ++++++--
 .../apache/hadoop/fs/TestSymlinkLocalFS.java    | 18 ++++++
 .../hadoop/fs/shell/TestCopyPreserveFlag.java   | 63 ++++++++++----------
 6 files changed, 132 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1d737e5..f2f9d5c 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -675,6 +675,9 @@ Release 2.8.0 - UNRELEASED
 
     HADOOP-12171. Shorten overly-long htrace span names for server (cmccabe)
 
+    HADOOP-12045. Enable LocalFileSystem#setTimes to change atime.
+    (Kazuho Fujii via cnauroth)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index 96d1ab4..ac65b62 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -33,6 +33,10 @@ import java.io.OutputStream;
 import java.io.FileDescriptor;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.BasicFileAttributeView;
+import java.nio.file.attribute.FileTime;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.StringTokenizer;
@@ -644,9 +648,14 @@ public class RawLocalFileSystem extends FileSystem {
       return !super.getOwner().isEmpty(); 
     }
     
-    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
+    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)
+      throws IOException {
       super(f.length(), f.isDirectory(), 1, defaultBlockSize,
-          f.lastModified(), new Path(f.getPath()).makeQualified(fs.getUri(),
+          f.lastModified(),
+          Files.readAttributes(f.toPath(),
+            BasicFileAttributes.class).lastAccessTime().toMillis(),
+          null, null, null,
+          new Path(f.getPath()).makeQualified(fs.getUri(),
             fs.getWorkingDirectory()));
     }
     
@@ -758,25 +767,20 @@ public class RawLocalFileSystem extends FileSystem {
   }
  
   /**
-   * Sets the {@link Path}'s last modified time <em>only</em> to the given
-   * valid time.
+   * Sets the {@link Path}'s last modified time and last access time to
+   * the given valid times.
    *
    * @param mtime the modification time to set (only if greater than zero).
-   * @param atime currently ignored.
-   * @throws IOException if setting the last modified time fails.
+   * @param atime the access time to set (only if greater than zero).
+   * @throws IOException if setting the times fails.
    */
   @Override
   public void setTimes(Path p, long mtime, long atime) throws IOException {
-    File f = pathToFile(p);
-    if(mtime >= 0) {
-      if(!f.setLastModified(mtime)) {
-        throw new IOException(
-          "couldn't set last-modified time to " +
-          mtime +
-          " for " +
-          f.getAbsolutePath());
-      }
-    }
+    BasicFileAttributeView view = Files.getFileAttributeView(
+        pathToFile(p).toPath(), BasicFileAttributeView.class);
+    FileTime fmtime = (mtime >= 0) ? FileTime.fromMillis(mtime) : null;
+    FileTime fatime = (atime >= 0) ? FileTime.fromMillis(atime) : null;
+    view.setTimes(fmtime, fatime, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
index 4d6485d..8018946 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
@@ -1386,19 +1386,48 @@ public abstract class SymlinkBaseTest {
   }
 
   @Test(timeout=10000)
-  /** setTimes affects the target not the link */
-  public void testSetTimes() throws IOException {
+  /** setTimes affects the target file not the link */
+  public void testSetTimesSymlinkToFile() throws IOException {
     Path file = new Path(testBaseDir1(), "file");
     Path link = new Path(testBaseDir1(), "linkToFile");
     createAndWriteFile(file);
     wrapper.createSymlink(file, link, false);
     long at = wrapper.getFileLinkStatus(link).getAccessTime();
-    wrapper.setTimes(link, 2L, 3L);
-    // NB: local file systems don't implement setTimes
-    if (!"file".equals(getScheme())) {
-      assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
-      assertEquals(3, wrapper.getFileStatus(file).getAccessTime());
-      assertEquals(2, wrapper.getFileStatus(file).getModificationTime());
+    // the local file system may not support millisecond timestamps
+    wrapper.setTimes(link, 2000L, 3000L);
+    assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
+    assertEquals(2000, wrapper.getFileStatus(file).getModificationTime());
+    assertEquals(3000, wrapper.getFileStatus(file).getAccessTime());
+  }
+
+  @Test(timeout=10000)
+  /** setTimes affects the target directory not the link */
+  public void testSetTimesSymlinkToDir() throws IOException {
+    Path dir = new Path(testBaseDir1(), "dir");
+    Path link = new Path(testBaseDir1(), "linkToDir");
+    wrapper.mkdir(dir, FileContext.DEFAULT_PERM, false);
+    wrapper.createSymlink(dir, link, false);
+    long at = wrapper.getFileLinkStatus(link).getAccessTime();
+    // the local file system may not support millisecond timestamps
+    wrapper.setTimes(link, 2000L, 3000L);
+    assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
+    assertEquals(2000, wrapper.getFileStatus(dir).getModificationTime());
+    assertEquals(3000, wrapper.getFileStatus(dir).getAccessTime());
+  }
+
+  @Test(timeout=10000)
+  /** setTimes does not affect the link even though target does not exist */
+  public void testSetTimesDanglingLink() throws IOException {
+    Path file = new Path("/noSuchFile");
+    Path link = new Path(testBaseDir1()+"/link");
+    wrapper.createSymlink(file, link, false);
+    long at = wrapper.getFileLinkStatus(link).getAccessTime();
+    try {
+      wrapper.setTimes(link, 2000L, 3000L);
+      fail("set times to non-existant file");
+    } catch (IOException e) {
+      // Expected
     }
+    assertEquals(at, wrapper.getFileLinkStatus(link).getAccessTime());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index df5cba9..f641f04 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -378,7 +378,14 @@ public class TestLocalFileSystem {
     assertTrue(dataFileFound);
     assertTrue(checksumFileFound);
   }
-  
+
+  private void checkTimesStatus(Path path,
+    long expectedModTime, long expectedAccTime) throws IOException {
+    FileStatus status = fileSys.getFileStatus(path);
+    assertEquals(expectedModTime, status.getModificationTime());
+    assertEquals(expectedAccTime, status.getAccessTime());
+  }
+
   @Test(timeout = 1000)
   public void testSetTimes() throws Exception {
     Path path = new Path(TEST_ROOT_DIR, "set-times");
@@ -387,15 +394,24 @@ public class TestLocalFileSystem {
     // test only to the nearest second, as the raw FS may not
     // support millisecond timestamps
     long newModTime = 12345000;
+    long newAccTime = 23456000;
 
     FileStatus status = fileSys.getFileStatus(path);
     assertTrue("check we're actually changing something", newModTime != status.getModificationTime());
-    long accessTime = status.getAccessTime();
+    assertTrue("check we're actually changing something", newAccTime != status.getAccessTime());
+
+    fileSys.setTimes(path, newModTime, newAccTime);
+    checkTimesStatus(path, newModTime, newAccTime);
+
+    newModTime = 34567000;
 
     fileSys.setTimes(path, newModTime, -1);
-    status = fileSys.getFileStatus(path);
-    assertEquals(newModTime, status.getModificationTime());
-    assertEquals(accessTime, status.getAccessTime());
+    checkTimesStatus(path, newModTime, newAccTime);
+
+    newAccTime = 45678000;
+
+    fileSys.setTimes(path, -1, newAccTime);
+    checkTimesStatus(path, newModTime, newAccTime);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
index 64e34af..602af97 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestSymlinkLocalFS.java
@@ -231,4 +231,22 @@ abstract public class TestSymlinkLocalFS extends SymlinkBaseTest {
       // Expected.
     }
   }
+
+  @Override
+  public void testSetTimesSymlinkToFile() throws IOException {
+    assumeTrue(!Path.WINDOWS);
+    super.testSetTimesSymlinkToFile();
+  }
+
+  @Override
+  public void testSetTimesSymlinkToDir() throws IOException {
+    assumeTrue(!Path.WINDOWS);
+    super.testSetTimesSymlinkToDir();
+  }
+
+  @Override
+  public void testSetTimesDanglingLink() throws IOException {
+    assumeTrue(!Path.WINDOWS);
+    super.testSetTimesDanglingLink();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed1e3ce4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
index ecfb5a5..263c697 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
@@ -18,12 +18,13 @@
 package org.apache.hadoop.fs.shell;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,8 +39,12 @@ import org.junit.Test;
 
 public class TestCopyPreserveFlag {
   private static final int MODIFICATION_TIME = 12345000;
-  private static final Path FROM = new Path("d1", "f1");
-  private static final Path TO = new Path("d2", "f2");
+  private static final int ACCESS_TIME = 23456000;
+  private static final Path DIR_FROM = new Path("d0");
+  private static final Path DIR_TO1 = new Path("d1");
+  private static final Path DIR_TO2 = new Path("d2");
+  private static final Path FROM = new Path(DIR_FROM, "f0");
+  private static final Path TO = new Path(DIR_TO1, "f1");
   private static final FsPermission PERMISSIONS = new FsPermission(
     FsAction.ALL,
     FsAction.EXECUTE,
@@ -62,8 +67,8 @@ public class TestCopyPreserveFlag {
 
     FileSystem.setDefaultUri(conf, fs.getUri());
     fs.setWorkingDirectory(testDir);
-    fs.mkdirs(new Path("d1"));
-    fs.mkdirs(new Path("d2"));
+    fs.mkdirs(DIR_FROM);
+    fs.mkdirs(DIR_TO1);
     fs.createNewFile(FROM);
 
     FSDataOutputStream output = fs.create(FROM, true);
@@ -72,10 +77,10 @@ public class TestCopyPreserveFlag {
         output.writeChar('\n');
     }
     output.close();
-    fs.setTimes(FROM, MODIFICATION_TIME, 0);
+    fs.setTimes(FROM, MODIFICATION_TIME, ACCESS_TIME);
     fs.setPermission(FROM, PERMISSIONS);
-    fs.setTimes(new Path("d1"), MODIFICATION_TIME, 0);
-    fs.setPermission(new Path("d1"), PERMISSIONS);
+    fs.setTimes(DIR_FROM, MODIFICATION_TIME, ACCESS_TIME);
+    fs.setPermission(DIR_FROM, PERMISSIONS);
   }
 
   @After
@@ -84,14 +89,18 @@ public class TestCopyPreserveFlag {
     fs.close();
   }
 
-  private void assertAttributesPreserved() throws IOException {
-    assertEquals(MODIFICATION_TIME, fs.getFileStatus(TO).getModificationTime());
-    assertEquals(PERMISSIONS, fs.getFileStatus(TO).getPermission());
+  private void assertAttributesPreserved(Path to) throws IOException {
+    FileStatus status = fs.getFileStatus(to);
+    assertEquals(MODIFICATION_TIME, status.getModificationTime());
+    assertEquals(ACCESS_TIME, status.getAccessTime());
+    assertEquals(PERMISSIONS, status.getPermission());
   }
 
-  private void assertAttributesChanged() throws IOException {
-      assertTrue(MODIFICATION_TIME != fs.getFileStatus(TO).getModificationTime());
-      assertTrue(!PERMISSIONS.equals(fs.getFileStatus(TO).getPermission()));
+  private void assertAttributesChanged(Path to) throws IOException {
+    FileStatus status = fs.getFileStatus(to);
+    assertNotEquals(MODIFICATION_TIME, status.getModificationTime());
+    assertNotEquals(ACCESS_TIME, status.getAccessTime());
+    assertNotEquals(PERMISSIONS, status.getPermission());
   }
 
   private void run(CommandWithDestination cmd, String... args) {
@@ -102,54 +111,48 @@ public class TestCopyPreserveFlag {
   @Test(timeout = 10000)
   public void testPutWithP() throws Exception {
     run(new Put(), "-p", FROM.toString(), TO.toString());
-    assertAttributesPreserved();
+    assertAttributesPreserved(TO);
   }
 
   @Test(timeout = 10000)
   public void testPutWithoutP() throws Exception {
     run(new Put(), FROM.toString(), TO.toString());
-    assertAttributesChanged();
+    assertAttributesChanged(TO);
   }
 
   @Test(timeout = 10000)
   public void testGetWithP() throws Exception {
     run(new Get(), "-p", FROM.toString(), TO.toString());
-    assertAttributesPreserved();
+    assertAttributesPreserved(TO);
   }
 
   @Test(timeout = 10000)
   public void testGetWithoutP() throws Exception {
     run(new Get(), FROM.toString(), TO.toString());
-    assertAttributesChanged();
+    assertAttributesChanged(TO);
   }
 
   @Test(timeout = 10000)
   public void testCpWithP() throws Exception {
       run(new Cp(), "-p", FROM.toString(), TO.toString());
-      assertAttributesPreserved();
+      assertAttributesPreserved(TO);
   }
 
   @Test(timeout = 10000)
   public void testCpWithoutP() throws Exception {
       run(new Cp(), FROM.toString(), TO.toString());
-      assertAttributesChanged();
+      assertAttributesChanged(TO);
   }
 
   @Test(timeout = 10000)
   public void testDirectoryCpWithP() throws Exception {
-    run(new Cp(), "-p", "d1", "d3");
-    assertEquals(fs.getFileStatus(new Path("d1")).getModificationTime(),
-        fs.getFileStatus(new Path("d3")).getModificationTime());
-    assertEquals(fs.getFileStatus(new Path("d1")).getPermission(),
-        fs.getFileStatus(new Path("d3")).getPermission());
+    run(new Cp(), "-p", DIR_FROM.toString(), DIR_TO2.toString());
+    assertAttributesPreserved(DIR_TO2);
   }
 
   @Test(timeout = 10000)
   public void testDirectoryCpWithoutP() throws Exception {
-    run(new Cp(), "d1", "d4");
-    assertTrue(fs.getFileStatus(new Path("d1")).getModificationTime() !=
-        fs.getFileStatus(new Path("d4")).getModificationTime());
-    assertTrue(!fs.getFileStatus(new Path("d1")).getPermission()
-        .equals(fs.getFileStatus(new Path("d4")).getPermission()));
+    run(new Cp(), DIR_FROM.toString(), DIR_TO2.toString());
+    assertAttributesChanged(DIR_TO2);
   }
 }


[08/24] hadoop git commit: HDFS-8709. Clarify automatic sync in FSEditLog#logEdit.

Posted by ar...@apache.org.
HDFS-8709. Clarify automatic sync in FSEditLog#logEdit.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5fddc517
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5fddc517
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5fddc517

Branch: refs/heads/HDFS-7240
Commit: 5fddc5177ddad07a735d49c15a63cfc5f74d0891
Parents: bff5999
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Jul 2 10:26:40 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Jul 2 10:26:40 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  2 ++
 .../hadoop/hdfs/server/namenode/FSEditLog.java  | 28 +++++++++++++-------
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fddc517/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7b96c56..6678a3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -699,6 +699,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
     (vinayakumarb)
 
+    HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fddc517/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 1b0b572..939e841 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -409,10 +409,14 @@ public class FSEditLog implements LogsPurgeable {
   }
 
   /**
-   * Write an operation to the edit log. Do not sync to persistent
-   * store yet.
+   * Write an operation to the edit log.
+   * <p/>
+   * Additionally, this will sync the edit log if required by the underlying
+   * edit stream's automatic sync policy (e.g. when the buffer is full, or
+   * if a time interval has elapsed).
    */
   void logEdit(final FSEditLogOp op) {
+    boolean needsSync = false;
     synchronized (this) {
       assert isOpenForWrite() :
         "bad state: " + state;
@@ -434,14 +438,16 @@ public class FSEditLog implements LogsPurgeable {
       endTransaction(start);
       
       // check if it is time to schedule an automatic sync
-      if (!shouldForceSync()) {
-        return;
+      needsSync = shouldForceSync();
+      if (needsSync) {
+        isAutoSyncScheduled = true;
       }
-      isAutoSyncScheduled = true;
     }
     
-    // sync buffered edit log entries to persistent store
-    logSync();
+    // Sync the log if an automatic sync is required.
+    if (needsSync) {
+      logSync();
+    }
   }
 
   /**
@@ -1191,11 +1197,13 @@ public class FSEditLog implements LogsPurgeable {
       throws IOException {
     return journalSet.getEditLogManifest(fromTxId);
   }
- 
+
   /**
    * Finalizes the current edit log and opens a new log segment.
-   * @return the transaction id of the BEGIN_LOG_SEGMENT transaction
-   * in the new log.
+   *
+   * @param layoutVersion The layout version of the new edit log segment.
+   * @return the transaction id of the BEGIN_LOG_SEGMENT transaction in the new
+   * log.
    */
   synchronized long rollEditLog(int layoutVersion) throws IOException {
     LOG.info("Rolling edit logs");