You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/04 20:25:10 UTC

[01/23] hadoop git commit: HADOOP-15152. Typo in javadoc of ReconfigurableBase#reconfigurePropertyImpl. Contributed by Nanda kumar.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 a018c2949 -> 35c2d261d


HADOOP-15152. Typo in javadoc of ReconfigurableBase#reconfigurePropertyImpl. Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dfe0cd86
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dfe0cd86
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dfe0cd86

Branch: refs/heads/HDFS-7240
Commit: dfe0cd86553bd2688603ea382ea593171d520471
Parents: 7fe6f83
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Jan 2 10:50:13 2018 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Jan 2 10:50:13 2018 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dfe0cd86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
index a705279..231cd3a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
@@ -262,7 +262,7 @@ public abstract class ReconfigurableBase
    * all internal data structures derived from the configuration property
    * that is being changed. If this object owns other Reconfigurable objects
    * reconfigureProperty should be called recursively to make sure that
-   * to make sure that the configuration of these objects is updated.
+   * the configuration of these objects are updated.
    *
    * @param property Name of the property that is being reconfigured.
    * @param newVal Proposed new value of the property.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[18/23] hadoop git commit: HDFS-12948. DiskBalancer report command top option should only take positive numeric values. Contributed by Shashikant Banerjee.

Posted by ae...@apache.org.
HDFS-12948. DiskBalancer report command top option should only take positive numeric values. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a48b359
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a48b359
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a48b359

Branch: refs/heads/HDFS-7240
Commit: 2a48b3594c502c4dcf201f2b60386383c0d9ae91
Parents: 7a55044
Author: Yiqun Lin <yq...@apache.org>
Authored: Thu Jan 4 10:48:44 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Thu Jan 4 10:48:44 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hdfs/server/diskbalancer/command/Command.java    | 7 ++++++-
 .../hdfs/server/diskbalancer/command/ReportCommand.java     | 2 +-
 .../diskbalancer/command/TestDiskBalancerCommand.java       | 9 +++++++++
 3 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a48b359/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
index eeb7241..8eacdec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
@@ -501,7 +501,8 @@ public abstract class Command extends Configured implements Closeable {
    * Parse top number of nodes to be processed.
    * @return top number of nodes to be processed.
    */
-  protected int parseTopNodes(final CommandLine cmd, final StrBuilder result) {
+  protected int parseTopNodes(final CommandLine cmd, final StrBuilder result)
+      throws IllegalArgumentException {
     String outputLine = "";
     int nodes = 0;
     final String topVal = cmd.getOptionValue(DiskBalancerCLI.TOP);
@@ -523,6 +524,10 @@ public abstract class Command extends Configured implements Closeable {
         result.appendln(outputLine);
         nodes = getDefaultTop();
       }
+      if (nodes <= 0) {
+        throw new IllegalArgumentException(
+            "Top limit input should be a positive numeric value");
+      }
     }
 
     return Math.min(nodes, cluster.getNodes().size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a48b359/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
index b224b11..58ef5ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
@@ -100,7 +100,7 @@ public class ReportCommand extends Command {
   }
 
   private void handleTopReport(final CommandLine cmd, final StrBuilder result,
-      final String nodeFormat) {
+      final String nodeFormat) throws IllegalArgumentException {
     Collections.sort(getCluster().getNodes(), Collections.reverseOrder());
 
     /* extract value that identifies top X DataNode(s) */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a48b359/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
index 1cebae0..6fde209 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
@@ -244,6 +244,15 @@ public class TestDiskBalancerCommand {
 
   }
 
+  /* test basic report with negative top limit */
+  @Test(timeout = 60000)
+  public void testReportWithNegativeTopLimit()
+      throws Exception {
+    final String cmdLine = "hdfs diskbalancer -report -top -32";
+    thrown.expect(java.lang.IllegalArgumentException.class);
+    thrown.expectMessage("Top limit input should be a positive numeric value");
+    runCommand(cmdLine);
+  }
   /* test less than 64 DataNode(s) as total, e.g., -report -top 32 */
   @Test(timeout = 60000)
   public void testReportLessThanTotal() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[15/23] hadoop git commit: HADOOP-15093. Deprecation of yarn.resourcemanager.zk-address is undocumented. Contributed by Ajay Kumar.

Posted by ae...@apache.org.
HADOOP-15093. Deprecation of yarn.resourcemanager.zk-address is undocumented. Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4379113b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4379113b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4379113b

Branch: refs/heads/HDFS-7240
Commit: 4379113bda98eab47b00d17d175d57074230aac9
Parents: 2f6c038
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Jan 3 14:32:47 2018 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Jan 3 14:32:47 2018 -0800

----------------------------------------------------------------------
 .../src/site/markdown/ResourceManagerHA.md              |  2 +-
 .../src/site/markdown/ResourceManagerRestart.md         | 12 ++++++------
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4379113b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
index 61eb773..da9f5a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
@@ -56,7 +56,7 @@ Most of the failover functionality is tunable using various configuration proper
 
 | Configuration Properties | Description |
 |:---- |:---- |
-| `yarn.resourcemanager.zk-address` | Address of the ZK-quorum. Used both for the state-store and embedded leader-election. |
+| `hadoop.zk.address` | Address of the ZK-quorum. Used both for the state-store and embedded leader-election. |
 | `yarn.resourcemanager.ha.enabled` | Enable RM HA. |
 | `yarn.resourcemanager.ha.rm-ids` | List of logical IDs for the RMs. e.g., "rm1,rm2". |
 | `yarn.resourcemanager.hostname.`*rm-id* | For each *rm-id*, specify the hostname the RM corresponds to. Alternately, one could set each of the RM's service addresses. |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4379113b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRestart.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRestart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRestart.md
index 86c1a6f..68e44e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRestart.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRestart.md
@@ -93,22 +93,22 @@ This section describes the configurations involved to enable RM Restart feature.
 
 | Property | Description |
 |:---- |:---- |
-| `yarn.resourcemanager.zk-address` | Comma separated list of Host:Port pairs. Each corresponds to a ZooKeeper server (e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state. |
+| `hadoop.zk.address` | Comma separated list of Host:Port pairs. Each corresponds to a ZooKeeper server (e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state. |
 | `yarn.resourcemanager.zk-state-store.parent-path` | The full path of the root znode where RM state will be stored. Default value is /rmstore. |
 
 * Configure the retry policy state-store client uses to connect with the ZooKeeper server.
 
 | Property | Description |
 |:---- |:---- |
-| `yarn.resourcemanager.zk-num-retries` | Number of times RM tries to connect to ZooKeeper server if the connection is lost. Default value is 500. |
-| `yarn.resourcemanager.zk-retry-interval-ms` | The interval in milliseconds between retries when connecting to a ZooKeeper server. Default value is 2 seconds. |
-| `yarn.resourcemanager.zk-timeout-ms` | ZooKeeper session timeout in milliseconds. This configuration is used by the ZooKeeper server to determine when the session expires. Session expiration happens when the server does not hear from the client (i.e. no heartbeat) within the session timeout period specified by this configuration. Default value is 10 seconds |
+| `hadoop.zk.num-retries` | Number of times RM tries to connect to ZooKeeper server if the connection is lost. Default value is 500. |
+| `hadoop.zk.retry-interval-ms` | The interval in milliseconds between retries when connecting to a ZooKeeper server. Default value is 2 seconds. |
+| `hadoop.zk.timeout-ms` | ZooKeeper session timeout in milliseconds. This configuration is used by the ZooKeeper server to determine when the session expires. Session expiration happens when the server does not hear from the client (i.e. no heartbeat) within the session timeout period specified by this configuration. Default value is 10 seconds |
 
 * Configure the ACLs to be used for setting permissions on ZooKeeper znodes.
 
 | Property | Description |
 |:---- |:---- |
-| `yarn.resourcemanager.zk-acl` | ACLs to be used for setting permissions on ZooKeeper znodes. Default value is `world:anyone:rwcda` |
+| `hadoop.zk.acl` | ACLs to be used for setting permissions on ZooKeeper znodes. Default value is `world:anyone:rwcda` |
 
 ### Configurations for LevelDB based state-store implementation
 
@@ -157,7 +157,7 @@ Below is a minimum set of configurations for enabling RM work-preserving restart
        (e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state.
        This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
        as the value for yarn.resourcemanager.store.class</description>
-       <name>yarn.resourcemanager.zk-address</name>
+       <name>hadoop.zk.address</name>
        <value>127.0.0.1:2181</value>
      </property>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[20/23] hadoop git commit: YARN-7678. Ability to enable logging of container memory stats. Contributed by Jim Brennan

Posted by ae...@apache.org.
YARN-7678. Ability to enable logging of container memory stats. Contributed by Jim Brennan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d7956618
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d7956618
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d7956618

Branch: refs/heads/HDFS-7240
Commit: d795661868e330ac55d6ac7b0ee62fb658e03ff7
Parents: 45a4719
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 4 10:15:52 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 4 10:15:52 2018 -0600

----------------------------------------------------------------------
 .../containermanager/monitor/ContainersMonitorImpl.java        | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7956618/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index bc28646..23c89c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -59,6 +59,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private final static Logger LOG =
        LoggerFactory.getLogger(ContainersMonitorImpl.class);
+  private final static Logger AUDITLOG =
+       LoggerFactory.getLogger(ContainersMonitorImpl.class.getName()+".audit");
 
   private long monitoringInterval;
   private MonitoringThread monitoringThread;
@@ -595,8 +597,8 @@ public class ContainersMonitorImpl extends AbstractService implements
               * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
       long vmemLimit = ptInfo.getVmemLimit();
       long pmemLimit = ptInfo.getPmemLimit();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format(
+      if (AUDITLOG.isDebugEnabled()) {
+        AUDITLOG.debug(String.format(
                 "Memory usage of ProcessTree %s for container-id %s: ",
                 pId, containerId.toString()) +
                 formatUsageString(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[11/23] hadoop git commit: YARN-7691. Add Unit Tests for ContainersLauncher. (Sampada Dehankar via asuresh)

Posted by ae...@apache.org.
YARN-7691. Add Unit Tests for ContainersLauncher. (Sampada Dehankar via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0c7cce8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0c7cce8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0c7cce8

Branch: refs/heads/HDFS-7240
Commit: c0c7cce81d5609f6347bff67929d5026d5893d75
Parents: 7f515f5
Author: Arun Suresh <as...@apache.org>
Authored: Tue Jan 2 22:03:00 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Jan 2 22:03:00 2018 -0800

----------------------------------------------------------------------
 .../launcher/TestContainersLauncher.java        | 275 +++++++++++++++++++
 1 file changed, 275 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0c7cce8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
new file mode 100644
index 0000000..d4d6c67
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
@@ -0,0 +1,275 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests to verify all the Container's Launcher Events in
+ * {@link ContainersLauncher} are handled as expected.
+ */
+public class TestContainersLauncher {
+
+  @Mock
+  private ApplicationImpl app1;
+
+  @Mock
+  private ContainerImpl container;
+
+  @Mock
+  private ApplicationId appId;
+
+  @Mock
+  private ApplicationAttemptId appAttemptId;
+
+  @Mock
+  private ContainerId containerId;
+
+  @Mock
+  private ContainersLauncherEvent event;
+
+  @Mock
+  private NodeManager.NMContext context;
+
+  @Mock
+  private AsyncDispatcher dispatcher;
+
+  @Mock
+  private ContainerExecutor exec;
+
+  @Mock
+  private LocalDirsHandlerService dirsHandler;
+
+  @Mock
+  private ContainerManagerImpl containerManager;
+
+  @Mock
+  private ExecutorService containerLauncher;
+
+  @Mock
+  private Configuration conf;
+
+  @Mock
+  private ContainerLaunch containerLaunch;
+
+  @InjectMocks
+  private ContainersLauncher tempContainersLauncher = new ContainersLauncher(
+      context, dispatcher, exec, dirsHandler, containerManager);
+
+  private ContainersLauncher spy;
+
+  @Before
+  public void setup() throws IllegalArgumentException, IllegalAccessException {
+    MockitoAnnotations.initMocks(this);
+    ConcurrentMap<ApplicationId, Application> applications =
+        new ConcurrentHashMap<>();
+    applications.put(appId, app1);
+    spy = spy(tempContainersLauncher);
+    conf = doReturn(conf).when(spy).getConfig();
+    when(event.getContainer()).thenReturn(container);
+    when(container.getContainerId()).thenReturn(containerId);
+    when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(containerId.getApplicationAttemptId().getApplicationId())
+        .thenReturn(appId);
+    when(context.getApplications()).thenReturn(applications);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testLaunchContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException {
+    Map<ContainerId, ContainerLaunch> dummyMap =
+        (Map<ContainerId, ContainerLaunch>) Whitebox.getInternalState(spy,
+            "running");
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.LAUNCH_CONTAINER);
+    assertEquals(0, dummyMap.size());
+    spy.handle(event);
+    assertEquals(1, dummyMap.size());
+    Mockito.verify(containerLauncher, Mockito.times(1))
+        .submit(Mockito.any(ContainerLaunch.class));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRelaunchContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException {
+    Map<ContainerId, ContainerLaunch> dummyMap =
+        (Map<ContainerId, ContainerLaunch>) Whitebox.getInternalState(spy,
+            "running");
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.RELAUNCH_CONTAINER);
+    assertEquals(0, dummyMap.size());
+    spy.handle(event);
+    assertEquals(1, dummyMap.size());
+    Mockito.verify(containerLauncher, Mockito.times(1))
+        .submit(Mockito.any(ContainerRelaunch.class));
+    for (ContainerId cid : dummyMap.keySet()) {
+      Object o = dummyMap.get(cid);
+      assertEquals(true, (o instanceof ContainerRelaunch));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRecoverContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException {
+    Map<ContainerId, ContainerLaunch> dummyMap =
+        (Map<ContainerId, ContainerLaunch>) Whitebox.getInternalState(spy,
+            "running");
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.RECOVER_CONTAINER);
+    assertEquals(0, dummyMap.size());
+    spy.handle(event);
+    assertEquals(1, dummyMap.size());
+    Mockito.verify(containerLauncher, Mockito.times(1))
+        .submit(Mockito.any(RecoveredContainerLaunch.class));
+    for (ContainerId cid : dummyMap.keySet()) {
+      Object o = dummyMap.get(cid);
+      assertEquals(true, (o instanceof RecoveredContainerLaunch));
+    }
+  }
+
+  @Test
+  public void testRecoverPausedContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException {
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER);
+    spy.handle(event);
+    Mockito.verify(containerLauncher, Mockito.times(1))
+        .submit(Mockito.any(RecoverPausedContainerLaunch.class));
+  }
+
+  @Test
+  public void testCleanupContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    Map<ContainerId, ContainerLaunch> dummyMap = Collections
+        .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
+    dummyMap.put(containerId, containerLaunch);
+    Whitebox.setInternalState(spy, "running", dummyMap);
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER);
+    doNothing().when(containerLaunch).cleanupContainer();
+    spy.handle(event);
+    assertEquals(0, dummyMap.size());
+    Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer();
+  }
+
+  @Test
+  public void testCleanupContainerForReINITEvent()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    Map<ContainerId, ContainerLaunch> dummyMap = Collections
+        .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
+    dummyMap.put(containerId, containerLaunch);
+    Whitebox.setInternalState(spy, "running", dummyMap);
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT);
+    doNothing().when(containerLaunch).cleanupContainer();
+    spy.handle(event);
+    assertEquals(0, dummyMap.size());
+    Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer();
+  }
+
+  @Test
+  public void testSignalContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    Map<ContainerId, ContainerLaunch> dummyMap = Collections
+        .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
+    dummyMap.put(containerId, containerLaunch);
+
+    SignalContainersLauncherEvent dummyEvent =
+        mock(SignalContainersLauncherEvent.class);
+    when(dummyEvent.getContainer()).thenReturn(container);
+    when(container.getContainerId()).thenReturn(containerId);
+    when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(containerId.getApplicationAttemptId().getApplicationId())
+        .thenReturn(appId);
+
+    Whitebox.setInternalState(spy, "running", dummyMap);
+    when(dummyEvent.getType())
+        .thenReturn(ContainersLauncherEventType.SIGNAL_CONTAINER);
+    when(dummyEvent.getCommand())
+        .thenReturn(SignalContainerCommand.GRACEFUL_SHUTDOWN);
+    doNothing().when(containerLaunch)
+        .signalContainer(SignalContainerCommand.GRACEFUL_SHUTDOWN);
+    spy.handle(dummyEvent);
+    assertEquals(1, dummyMap.size());
+    Mockito.verify(containerLaunch, Mockito.times(1))
+        .signalContainer(SignalContainerCommand.GRACEFUL_SHUTDOWN);
+  }
+
+  @Test
+  public void testPauseContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    Map<ContainerId, ContainerLaunch> dummyMap = Collections
+        .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
+    dummyMap.put(containerId, containerLaunch);
+    Whitebox.setInternalState(spy, "running", dummyMap);
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.PAUSE_CONTAINER);
+    doNothing().when(containerLaunch).pauseContainer();
+    spy.handle(event);
+    assertEquals(1, dummyMap.size());
+    Mockito.verify(containerLaunch, Mockito.times(1)).pauseContainer();
+  }
+
+  @Test
+  public void testResumeContainerEvent()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    Map<ContainerId, ContainerLaunch> dummyMap = Collections
+        .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
+    dummyMap.put(containerId, containerLaunch);
+    Whitebox.setInternalState(spy, "running", dummyMap);
+    when(event.getType())
+        .thenReturn(ContainersLauncherEventType.RESUME_CONTAINER);
+    doNothing().when(containerLaunch).resumeContainer();
+    spy.handle(event);
+    assertEquals(1, dummyMap.size());
+    Mockito.verify(containerLaunch, Mockito.times(1)).resumeContainer();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[23/23] hadoop git commit: Merge branch 'trunk' into HDFS-7240

Posted by ae...@apache.org.
Merge branch 'trunk' into HDFS-7240


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/35c2d261
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/35c2d261
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/35c2d261

Branch: refs/heads/HDFS-7240
Commit: 35c2d261d262554b959553d7d1bbf567fd794203
Parents: a018c29 dc735b2
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Jan 4 12:16:42 2018 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Thu Jan 4 12:16:42 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/conf/ReconfigurableBase.java  |   4 +-
 .../java/org/apache/hadoop/fs/FileStatus.java   |  20 +-
 .../hadoop/metrics2/source/TestJvmMetrics.java  |  26 ++
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  16 +-
 .../hadoop/hdfs/DistributedFileSystem.java      |   8 +
 .../apache/hadoop/hdfs/FileChecksumHelper.java  |  15 +-
 .../apache/hadoop/hdfs/client/HdfsAdmin.java    |   7 +
 .../hadoop/hdfs/protocol/ClientProtocol.java    |  16 ++
 .../hadoop/hdfs/protocol/OpenFilesIterator.java |  36 ++-
 .../ClientNamenodeProtocolTranslatorPB.java     |  18 +-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  24 ++
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  63 +++--
 .../src/main/proto/ClientNamenodeProtocol.proto |   7 +
 ...tNamenodeProtocolServerSideTranslatorPB.java |   7 +-
 .../server/blockmanagement/BlockManager.java    |   2 +-
 .../blockmanagement/DatanodeAdminManager.java   |  25 +-
 .../blockmanagement/DatanodeDescriptor.java     |  24 +-
 .../server/diskbalancer/command/Command.java    |   7 +-
 .../diskbalancer/command/ReportCommand.java     |   2 +-
 .../federation/router/RouterRpcServer.java      |  10 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  49 +++-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  10 +-
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  |  36 ++-
 .../src/main/resources/hdfs-default.xml         |   4 +-
 .../src/main/webapps/hdfs/dfshealth.html        |   2 +-
 .../src/main/webapps/hdfs/dfshealth.js          |   2 +
 .../src/site/markdown/HDFSCommands.md           |   2 +-
 .../src/site/markdown/HDFSErasureCoding.md      |   4 +
 .../apache/hadoop/hdfs/AdminStatesBaseTest.java |  18 +-
 .../apache/hadoop/hdfs/TestDecommission.java    | 177 ++++++++++++
 .../hadoop/hdfs/TestEncryptedTransfer.java      |  52 ++++
 .../org/apache/hadoop/hdfs/TestHdfsAdmin.java   |   4 +-
 .../blockmanagement/BlockManagerTestUtil.java   |  12 +-
 .../command/TestDiskBalancerCommand.java        |   9 +
 .../hdfs/server/namenode/TestLeaseManager.java  |  48 ++--
 .../hdfs/server/namenode/TestListOpenFiles.java |  27 +-
 .../ha/TestDNFencingWithReplication.java        |  12 +-
 .../hadoop/hdfs/util/TestStripedBlockUtil.java  |  41 ++-
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  41 +--
 .../hadoop/yarn/ContainerLogAppender.java       |  76 ++---
 .../yarn/util/ProcfsBasedProcessTree.java       | 103 ++++---
 .../yarn/util/TestProcfsBasedProcessTree.java   |  15 +-
 .../yarn/server/nodemanager/NodeManager.java    |   1 +
 .../monitor/ContainersMonitorImpl.java          |   6 +-
 .../nodemanager/metrics/NodeManagerMetrics.java |   6 +-
 .../recovery/NMLeveldbStateStoreService.java    |  72 +++++
 .../recovery/NMStateStoreService.java           |  11 +
 .../launcher/TestContainersLauncher.java        | 275 +++++++++++++++++++
 .../metrics/TestNodeManagerMetrics.java         |  25 +-
 .../TestNMLeveldbStateStoreService.java         |  35 +++
 .../server/resourcemanager/RMAppManager.java    |  14 +-
 .../fair/AllocationFileLoaderService.java       | 104 ++++---
 .../fair/TestAllocationFileLoaderService.java   |  92 +++++--
 .../src/site/markdown/ResourceManagerHA.md      |   2 +-
 .../src/site/markdown/ResourceManagerRest.md    |   4 +-
 .../src/site/markdown/ResourceManagerRestart.md |  12 +-
 56 files changed, 1435 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/35c2d261/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[07/23] hadoop git commit: Revert "HADOOP-10054. ViewFsFileStatus.toString() is broken. Contributed by Hanisha Koneru."

Posted by ae...@apache.org.
Revert "HADOOP-10054. ViewFsFileStatus.toString() is broken. Contributed by Hanisha Koneru."

This reverts commit 37efa67e377e7fc251ee0088098f4b1700d21823.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ad39ec3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ad39ec3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ad39ec3

Branch: refs/heads/HDFS-7240
Commit: 4ad39ec3084ab45fb9bbace13082c88666a76a4c
Parents: d828748
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Jan 2 17:34:29 2018 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Jan 2 17:34:29 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileStatus.java   | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad39ec3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
index 63efc12..0663c43 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
@@ -444,18 +444,18 @@ public class FileStatus implements Writable, Comparable<Object>,
     StringBuilder sb = new StringBuilder();
     sb.append(getClass().getSimpleName()); 
     sb.append("{");
-    sb.append("path=" + getPath());
-    sb.append("; isDirectory=" + isDirectory());
+    sb.append("path=" + path);
+    sb.append("; isDirectory=" + isdir);
     if(!isDirectory()){
-      sb.append("; length=" + getLen());
-      sb.append("; replication=" + getReplication());
-      sb.append("; blocksize=" + getBlockSize());
+      sb.append("; length=" + length);
+      sb.append("; replication=" + block_replication);
+      sb.append("; blocksize=" + blocksize);
     }
-    sb.append("; modification_time=" + getModificationTime());
-    sb.append("; access_time=" + getAccessTime());
-    sb.append("; owner=" + getOwner());
-    sb.append("; group=" + getGroup());
-    sb.append("; permission=" + getPermission());
+    sb.append("; modification_time=" + modification_time);
+    sb.append("; access_time=" + access_time);
+    sb.append("; owner=" + owner);
+    sb.append("; group=" + group);
+    sb.append("; permission=" + permission);
     sb.append("; isSymlink=" + isSymlink());
     if(isSymlink()) {
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[05/23] hadoop git commit: YARN-7687. ContainerLogAppender Improvements. Contributed by BELUGA BEHR.

Posted by ae...@apache.org.
YARN-7687. ContainerLogAppender Improvements. Contributed by BELUGA BEHR.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33ae2a4a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33ae2a4a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33ae2a4a

Branch: refs/heads/HDFS-7240
Commit: 33ae2a4ae1a9a6561157d2ec8a1d80cb5c50ff2d
Parents: 5c28804
Author: Miklos Szegedi <sz...@apache.org>
Authored: Tue Jan 2 16:36:51 2018 -0800
Committer: Miklos Szegedi <sz...@apache.org>
Committed: Tue Jan 2 16:36:51 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/ContainerLogAppender.java       | 76 ++++++++++----------
 1 file changed, 39 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/33ae2a4a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java
index c49a1ab..751d9af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.yarn;
 
 import java.io.File;
 import java.io.Flushable;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.ArrayDeque;
+import java.util.Deque;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -30,49 +30,43 @@ import org.apache.log4j.spi.LoggingEvent;
 
 /**
  * A simple log4j-appender for container's logs.
- * 
  */
 @Public
 @Unstable
 public class ContainerLogAppender extends FileAppender
-  implements Flushable
-{
+    implements Flushable {
+
   private String containerLogDir;
   private String containerLogFile;
-  //so that log4j can configure it from the configuration(log4j.properties). 
   private int maxEvents;
-  private Queue<LoggingEvent> tail = null;
-  private boolean closing = false;
+  private Deque<LoggingEvent> eventBuffer;
+  private boolean closed = false;
 
   @Override
-  public void activateOptions() {
-    synchronized (this) {
-      if (maxEvents > 0) {
-        tail = new LinkedList<LoggingEvent>();
-      }
-      setFile(new File(this.containerLogDir, containerLogFile).toString());
-      setAppend(true);
-      super.activateOptions();
+  public synchronized void activateOptions() {
+    if (maxEvents > 0) {
+      this.eventBuffer = new ArrayDeque<>();
     }
+    setFile(new File(this.containerLogDir, containerLogFile).toString());
+    setAppend(true);
+    super.activateOptions();
   }
-  
+
   @Override
-  public void append(LoggingEvent event) {
-    synchronized (this) {
-      if (closing) { // When closing drop any new/transitive CLA appending
-        return;
-      }
-      if (tail == null) {
-        super.append(event);
-      } else {
-        if (tail.size() >= maxEvents) {
-          tail.remove();
-        }
-        tail.add(event);
+  public synchronized void append(LoggingEvent event) {
+    if (closed) {
+      return;
+    }
+    if (eventBuffer != null) {
+      if (eventBuffer.size() == maxEvents) {
+        eventBuffer.removeFirst();
       }
+      eventBuffer.addLast(event);
+    } else {
+      super.append(event);
     }
   }
-  
+
   @Override
   public void flush() {
     if (qw != null) {
@@ -82,13 +76,17 @@ public class ContainerLogAppender extends FileAppender
 
   @Override
   public synchronized void close() {
-    closing = true;
-    if (tail != null) {
-      for (LoggingEvent event : tail) {
-        super.append(event);
+    if (!closed) {
+      closed = true;
+      if (eventBuffer != null) {
+        for (LoggingEvent event : eventBuffer) {
+          super.append(event);
+        }
+        // let garbage collection do its work
+        eventBuffer = null;
       }
+      super.close();
     }
-    super.close();
   }
 
   /**
@@ -111,13 +109,17 @@ public class ContainerLogAppender extends FileAppender
     this.containerLogFile = containerLogFile;
   }
 
-  private static final int EVENT_SIZE = 100;
+  private static final long EVENT_SIZE = 100;
   
   public long getTotalLogFileSize() {
     return maxEvents * EVENT_SIZE;
   }
 
+  /**
+   *  Setter so that log4j can configure it from the
+   *  configuration(log4j.properties).
+   */
   public void setTotalLogFileSize(long logSize) {
-    maxEvents = (int) logSize / EVENT_SIZE;
+    maxEvents = (int)(logSize / EVENT_SIZE);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[14/23] hadoop git commit: YARN-7602. NM should reference the singleton JvmMetrics instance.

Posted by ae...@apache.org.
YARN-7602. NM should reference the singleton JvmMetrics instance.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2f6c038b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2f6c038b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2f6c038b

Branch: refs/heads/HDFS-7240
Commit: 2f6c038be6c23522ea64fc4e415910fb72493eb2
Parents: fe35103
Author: Haibo Chen <ha...@apache.org>
Authored: Tue Jan 2 10:04:56 2018 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Wed Jan 3 09:41:26 2018 -0800

----------------------------------------------------------------------
 .../hadoop/metrics2/source/TestJvmMetrics.java  | 26 ++++++++++++++++++++
 .../nodemanager/metrics/NodeManagerMetrics.java |  6 ++---
 .../metrics/TestNodeManagerMetrics.java         | 25 +++++++++++++++++--
 3 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f6c038b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java
index aa1b009..37a3a2a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java
@@ -200,4 +200,30 @@ public class TestJvmMetrics {
     Assert.assertTrue(alerter.numAlerts > 0);
     Assert.assertTrue(alerter.maxGcTimePercentage >= alertGcPerc);
   }
+
+  @Test
+  public void testJvmMetricsSingletonWithSameProcessName() {
+    JvmMetrics jvmMetrics1 = org.apache.hadoop.metrics2.source.JvmMetrics
+        .initSingleton("test", null);
+    JvmMetrics jvmMetrics2 = org.apache.hadoop.metrics2.source.JvmMetrics
+        .initSingleton("test", null);
+    Assert.assertEquals("initSingleton should return the singleton instance",
+        jvmMetrics1, jvmMetrics2);
+  }
+
+  @Test
+  public void testJvmMetricsSingletonWithDifferentProcessNames() {
+    final String process1Name = "process1";
+    JvmMetrics jvmMetrics1 = org.apache.hadoop.metrics2.source.JvmMetrics
+        .initSingleton(process1Name, null);
+    final String process2Name = "process2";
+    JvmMetrics jvmMetrics2 = org.apache.hadoop.metrics2.source.JvmMetrics
+        .initSingleton(process2Name, null);
+    Assert.assertEquals("initSingleton should return the singleton instance",
+        jvmMetrics1, jvmMetrics2);
+    Assert.assertEquals("unexpected process name of the singleton instance",
+        process1Name, jvmMetrics1.processName);
+    Assert.assertEquals("unexpected process name of the singleton instance",
+        process1Name, jvmMetrics2.processName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f6c038b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
index 1e7149b..f98b3f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
@@ -99,7 +99,7 @@ public class NodeManagerMetrics {
   private long availableMB;
   private long allocatedOpportunisticMB;
 
-  public NodeManagerMetrics(JvmMetrics jvmMetrics) {
+  private NodeManagerMetrics(JvmMetrics jvmMetrics) {
     this.jvmMetrics = jvmMetrics;
   }
 
@@ -107,8 +107,8 @@ public class NodeManagerMetrics {
     return create(DefaultMetricsSystem.instance());
   }
 
-  static NodeManagerMetrics create(MetricsSystem ms) {
-    JvmMetrics jm = JvmMetrics.create("NodeManager", null, ms);
+  private static NodeManagerMetrics create(MetricsSystem ms) {
+    JvmMetrics jm = JvmMetrics.initSingleton("NodeManager", null);
     return ms.register(new NodeManagerMetrics(jm));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f6c038b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
index 5dead91..d21e7ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
@@ -19,19 +19,40 @@ package org.apache.hadoop.yarn.server.nodemanager.metrics;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
 import static org.apache.hadoop.test.MetricsAsserts.*;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestNodeManagerMetrics {
   static final int GiB = 1024; // MiB
 
-  @Test public void testNames() {
+  private NodeManagerMetrics metrics;
+
+  @Before
+  public void setup() {
     DefaultMetricsSystem.initialize("NodeManager");
-    NodeManagerMetrics metrics = NodeManagerMetrics.create();
+    metrics = NodeManagerMetrics.create();
+  }
+
+  @After
+  public void tearDown() {
+    DefaultMetricsSystem.shutdown();
+  }
+
+  @Test
+  public void testReferenceOfSingletonJvmMetrics()  {
+    JvmMetrics jvmMetrics = JvmMetrics.initSingleton("NodeManagerModule", null);
+    Assert.assertEquals("NodeManagerMetrics should reference the singleton" +
+        " JvmMetrics instance", jvmMetrics, metrics.getJvmMetrics());
+  }
+
+  @Test public void testNames() {
     Resource total = Records.newRecord(Resource.class);
     total.setMemorySize(8*GiB);
     total.setVirtualCores(16);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[04/23] hadoop git commit: HDFS-12629. NameNode UI should report total blocks count by type - replicated and erasure coded.

Posted by ae...@apache.org.
HDFS-12629. NameNode UI should report total blocks count by type - replicated and erasure coded.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c28804b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c28804b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c28804b

Branch: refs/heads/HDFS-7240
Commit: 5c28804b953463ebdd1e9d32ad50f11887b6e277
Parents: 42a1c98
Author: Manoj Govindassamy <ma...@apache.org>
Authored: Tue Jan 2 15:05:27 2018 -0800
Committer: Manoj Govindassamy <ma...@apache.org>
Committed: Tue Jan 2 15:05:27 2018 -0800

----------------------------------------------------------------------
 .../hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html               | 2 +-
 hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c28804b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
index 45aee1e..96b1210 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
@@ -147,7 +147,7 @@
 
 <p>
   {#fs}
-  {FilesTotal|fmt_number} files and directories, {BlocksTotal|fmt_number} blocks = {ObjectsTotal|fmt_number} total filesystem object(s).
+  {FilesTotal|fmt_number} files and directories, {BlocksTotal|fmt_number} blocks ({#replicastat}{TotalReplicatedBlocks|fmt_number}{/replicastat} replicated blocks, {#ecstat}{TotalECBlockGroups|fmt_number}{/ecstat} erasure coded block groups) = {ObjectsTotal|fmt_number} total filesystem object(s).
   {#helper_fs_max_objects/}
   {/fs}
 </p>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c28804b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js
index de62622..4bc8e86 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js
@@ -37,6 +37,8 @@
       {"name": "nnstat",  "url": "/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"},
       {"name": "fs",      "url": "/jmx?qry=Hadoop:service=NameNode,name=FSNamesystemState"},
       {"name": "fsn",     "url": "/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem"},
+      {"name": "replicastat",      "url": "/jmx?qry=Hadoop:service=NameNode,name=ReplicatedBlocksState"},
+      {"name": "ecstat",      "url": "/jmx?qry=Hadoop:service=NameNode,name=ECBlockGroupsState"},
       {"name": "blockstats",      "url": "/jmx?qry=Hadoop:service=NameNode,name=BlockStats"},
       {"name": "mem",     "url": "/jmx?qry=java.lang:type=Memory"}
     ];


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[16/23] hadoop git commit: HDFS-12931. Handle InvalidEncryptionKeyException during DistributedFileSystem#getFileChecksum. Contributed by Mukul Kumar Singh.

Posted by ae...@apache.org.
HDFS-12931. Handle InvalidEncryptionKeyException during DistributedFileSystem#getFileChecksum. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ba98599
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ba98599
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ba98599

Branch: refs/heads/HDFS-7240
Commit: 3ba985997d1dc37e5ba017dd0ab1d36083b5f77b
Parents: 4379113
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Jan 3 14:54:20 2018 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Jan 3 14:54:20 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/FileChecksumHelper.java  | 15 +++++-
 .../hadoop/hdfs/TestEncryptedTransfer.java      | 52 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ba98599/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index 689d46d..72cf147 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
@@ -303,8 +304,7 @@ final class FileChecksumHelper {
      * Return true when sounds good to continue or retry, false when severe
      * condition or totally failed.
      */
-    private boolean checksumBlock(
-        LocatedBlock locatedBlock) throws IOException {
+    private boolean checksumBlock(LocatedBlock locatedBlock) {
       ExtendedBlock block = locatedBlock.getBlock();
       if (getRemaining() < block.getNumBytes()) {
         block.setNumBytes(getRemaining());
@@ -334,6 +334,17 @@ final class FileChecksumHelper {
             blockIdx--; // repeat at blockIdx-th block
             setRefetchBlocks(true);
           }
+        } catch (InvalidEncryptionKeyException iee) {
+          if (blockIdx > getLastRetriedIndex()) {
+            LOG.debug("Got invalid encryption key error in response to "
+                    + "OP_BLOCK_CHECKSUM for file {} for block {} from "
+                    + "datanode {}. Will retry " + "the block once.",
+                  getSrc(), block, datanodes[j]);
+            setLastRetriedIndex(blockIdx);
+            done = true; // actually it's not done; but we'll retry
+            blockIdx--; // repeat at i-th block
+            getClient().clearDataEncryptionKey();
+          }
         } catch (IOException ie) {
           LOG.warn("src={}" + ", datanodes[{}]={}",
               getSrc(), j, datanodes[j], ie);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ba98599/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
index cc90863..27a5b77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
@@ -59,6 +59,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.Assert;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -319,6 +320,57 @@ public class TestEncryptedTransfer {
   }
 
   @Test
+  public void testFileChecksumWithInvalidEncryptionKey()
+      throws IOException, InterruptedException, TimeoutException {
+    if (resolverClazz != null) {
+      // TestTrustedChannelResolver does not use encryption keys.
+      return;
+    }
+    setEncryptionConfigKeys();
+    cluster = new MiniDFSCluster.Builder(conf).build();
+
+    fs = getFileSystem(conf);
+    DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+    DFSClient spyClient = Mockito.spy(client);
+    DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+    writeTestDataToFile(fs);
+    FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+
+    BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager();
+    // Reduce key update interval and token life for testing.
+    btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+    btsm.setTokenLifetime(2 * 1000);
+    btsm.clearAllKeysForTesting();
+
+    // Wait until the encryption key becomes invalid.
+    LOG.info("Wait until encryption keys become invalid...");
+
+    DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    for (DataNode dn: dataNodes) {
+      GenericTestUtils.waitFor(
+          new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+              return !dn.getBlockPoolTokenSecretManager().
+                  get(encryptionKey.blockPoolId)
+                  .hasKey(encryptionKey.keyId);
+            }
+          }, 100, 30*1000
+      );
+    }
+    LOG.info("The encryption key is invalid on all nodes now.");
+    fs.getFileChecksum(TEST_PATH);
+    // verify that InvalidEncryptionKeyException is handled properly
+    Assert.assertTrue(client.getEncryptionKey() == null);
+    Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
+    // Retry the operation after clearing the encryption key
+    FileChecksum verifyChecksum = fs.getFileChecksum(TEST_PATH);
+    Assert.assertEquals(checksum, verifyChecksum);
+  }
+
+  @Test
   public void testLongLivedClientPipelineRecovery()
       throws IOException, InterruptedException, TimeoutException {
     if (resolverClazz != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[13/23] hadoop git commit: MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas

Posted by ae...@apache.org.
MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe351035
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe351035
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe351035

Branch: refs/heads/HDFS-7240
Commit: fe35103591ece0209f8345aba5544313e45a073c
Parents: c9bf813
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 3 11:01:38 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 3 11:01:38 2018 -0600

----------------------------------------------------------------------
 .../hadoop/mapred/TaskAttemptListenerImpl.java  | 41 +++++++++++---------
 1 file changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe351035/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 67f8ff0..556c90c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -585,33 +585,38 @@ public class TaskAttemptListenerImpl extends CompositeService
   private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
       TaskAttemptStatus taskAttemptStatus,
       AtomicReference<TaskAttemptStatus> lastStatusRef) {
-    boolean asyncUpdatedNeeded = false;
-    TaskAttemptStatus lastStatus = lastStatusRef.get();
-
-    if (lastStatus == null) {
-      lastStatusRef.set(taskAttemptStatus);
-      asyncUpdatedNeeded = true;
-    } else {
-      List<TaskAttemptId> oldFetchFailedMaps =
-          taskAttemptStatus.fetchFailedMaps;
-
-      // merge fetchFailedMaps from the previous update
-      if (lastStatus.fetchFailedMaps != null) {
+    List<TaskAttemptId> fetchFailedMaps = taskAttemptStatus.fetchFailedMaps;
+    TaskAttemptStatus lastStatus = null;
+    boolean done = false;
+    while (!done) {
+      lastStatus = lastStatusRef.get();
+      if (lastStatus != null && lastStatus.fetchFailedMaps != null) {
+        // merge fetchFailedMaps from the previous update
         if (taskAttemptStatus.fetchFailedMaps == null) {
           taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
         } else {
-          taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps =
+              new ArrayList<>(lastStatus.fetchFailedMaps.size() +
+                  fetchFailedMaps.size());
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              fetchFailedMaps);
         }
       }
 
-      if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
-        // update failed - async dispatcher has processed it in the meantime
-        taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
-        lastStatusRef.set(taskAttemptStatus);
-        asyncUpdatedNeeded = true;
+      // lastStatusRef may be changed by either the AsyncDispatcher when
+      // it processes the update, or by another IPC server handler
+      done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus);
+      if (!done) {
+        LOG.info("TaskAttempt " + yarnAttemptID +
+            ": lastStatusRef changed by another thread, retrying...");
+        // let's revert taskAttemptStatus.fetchFailedMaps
+        taskAttemptStatus.fetchFailedMaps = fetchFailedMaps;
       }
     }
 
+    boolean asyncUpdatedNeeded = (lastStatus == null);
     if (asyncUpdatedNeeded) {
       context.getEventHandler().handle(
           new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[06/23] hadoop git commit: HADOOP-15155. Error in javadoc of ReconfigurableBase#reconfigureProperty. Contributed by Ajay Kumar.

Posted by ae...@apache.org.
HADOOP-15155. Error in javadoc of ReconfigurableBase#reconfigureProperty. Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8287485
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8287485
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8287485

Branch: refs/heads/HDFS-7240
Commit: d82874851ea98caeb0ef5c23b7bc5d6fc14145ba
Parents: 33ae2a4
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Jan 2 17:22:55 2018 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Jan 2 17:22:55 2018 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8287485/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
index 231cd3a..23e1fda 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
@@ -214,7 +214,7 @@ public abstract class ReconfigurableBase
    * This method makes the change to this objects {@link Configuration}
    * and calls reconfigurePropertyImpl to update internal data structures.
    * This method cannot be overridden, subclasses should instead override
-   * reconfigureProperty.
+   * reconfigurePropertyImpl.
    */
   @Override
   public final void reconfigureProperty(String property, String newVal)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[08/23] hadoop git commit: YARN-7688. Miscellaneous Improvements To ProcfsBasedProcessTree. Contributed by BELUGA BEHR.

Posted by ae...@apache.org.
YARN-7688. Miscellaneous Improvements To ProcfsBasedProcessTree. Contributed by BELUGA BEHR.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/626b5103
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/626b5103
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/626b5103

Branch: refs/heads/HDFS-7240
Commit: 626b5103d44692adf3882af61bdafa40114c44f7
Parents: 4ad39ec
Author: Miklos Szegedi <sz...@apache.org>
Authored: Tue Jan 2 17:02:31 2018 -0800
Committer: Miklos Szegedi <sz...@apache.org>
Committed: Tue Jan 2 17:39:50 2018 -0800

----------------------------------------------------------------------
 .../yarn/util/ProcfsBasedProcessTree.java       | 103 +++++++++----------
 .../yarn/util/TestProcfsBasedProcessTree.java   |  15 +--
 2 files changed, 55 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/626b5103/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
index 7f81c5b..7431fdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
@@ -20,21 +20,30 @@ package org.apache.hadoop.yarn.util;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.InputStreamReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.filefilter.AndFileFilter;
+import org.apache.commons.io.filefilter.DirectoryFileFilter;
+import org.apache.commons.io.filefilter.RegexFileFilter;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,8 +94,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
     }
 
     public static MemInfo getMemInfoByName(String name) {
+      String searchName = StringUtils.trimToNull(name);
       for (MemInfo info : MemInfo.values()) {
-        if (info.name.trim().equalsIgnoreCase(name.trim())) {
+        if (info.name.trim().equalsIgnoreCase(searchName)) {
           return info;
         }
       }
@@ -170,7 +180,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
         return false;
       }
     } catch (SecurityException se) {
-      LOG.warn("Failed to get Operating System name. " + se);
+      LOG.warn("Failed to get Operating System name.", se);
       return false;
     }
     return true;
@@ -214,12 +224,12 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
       // Add each process to its parent.
       for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
         String pID = entry.getKey();
-        if (!pID.equals("1")) {
+        if (!"1".equals(pID)) {
           ProcessInfo pInfo = entry.getValue();
           String ppid = pInfo.getPpid();
           // If parent is init and process is not session leader,
           // attach to sessionID
-          if (ppid.equals("1")) {
+          if ("1".equals(ppid)) {
               String sid = pInfo.getSessionId().toString();
               if (!pID.equals(sid)) {
                  ppid = sid;
@@ -233,8 +243,8 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
       }
 
       // now start constructing the process-tree
-      LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
-      pInfoQueue.addAll(me.getChildren());
+      List<ProcessInfo> children = me.getChildren();
+      Queue<ProcessInfo> pInfoQueue = new ArrayDeque<ProcessInfo>(children);
       while (!pInfoQueue.isEmpty()) {
         ProcessInfo pInfo = pInfoQueue.remove();
         if (!processTree.containsKey(pInfo.getPid())) {
@@ -254,12 +264,10 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
         }
       }
 
-      if (LOG.isDebugEnabled()) {
-        // Log.debug the ProcfsBasedProcessTree
-        LOG.debug(this.toString());
-      }
+      LOG.debug(this);
+
       if (smapsEnabled) {
-        //Update smaps info
+        // Update smaps info
         processSMAPTree.clear();
         for (ProcessInfo p : processTree.values()) {
           if (p != null) {
@@ -296,9 +304,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
       "\t|- %s %s %d %d %s %d %d %d %d %s%n";
 
   public List<String> getCurrentProcessIDs() {
-    List<String> currentPIDs = new ArrayList<String>();
-    currentPIDs.addAll(processTree.keySet());
-    return currentPIDs;
+    return Collections.unmodifiableList(new ArrayList<>(processTree.keySet()));
   }
 
   /**
@@ -327,18 +333,17 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
 
   @Override
   public long getVirtualMemorySize(int olderThanAge) {
-    long total = UNAVAILABLE;
+    long total = 0L;
+    boolean isAvailable = false;
     for (ProcessInfo p : processTree.values()) {
       if (p != null) {
-        if (total == UNAVAILABLE ) {
-          total = 0;
-        }
+        isAvailable = true;
         if (p.getAge() > olderThanAge) {
           total += p.getVmem();
         }
       }
     }
-    return total;
+    return isAvailable ? total : UNAVAILABLE;
   }
 
   @Override
@@ -352,11 +357,11 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
     boolean isAvailable = false;
     long totalPages = 0;
     for (ProcessInfo p : processTree.values()) {
-      if ((p != null) ) {
+      if (p != null) {
+        isAvailable = true;
         if (p.getAge() > olderThanAge) {
           totalPages += p.getRssmemPage();
         }
-        isAvailable = true;
       }
     }
     return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte
@@ -405,9 +410,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
               }
             }
           }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(procMemInfo.toString());
-          }
+          LOG.debug(procMemInfo);
         }
       }
     }
@@ -427,9 +430,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
     boolean isAvailable = false;
     for (ProcessInfo p : processTree.values()) {
       if (p != null) {
-        incJiffies += p.getDtime();
         // data is available
         isAvailable = true;
+        incJiffies += p.getDtime();
       }
     }
     if (isAvailable) {
@@ -481,21 +484,17 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
    * Get the list of all processes in the system.
    */
   private List<String> getProcessList() {
-    List<String> processList = new ArrayList<String>();
-    String[] processDirs = (new File(procfsDir)).list();
-    if (processDirs != null) {
-      for (String dir : processDirs) {
-        Matcher m = numberPattern.matcher(dir);
-        if (!m.matches()) {
-          continue;
-        }
-        try {
-          if ((new File(procfsDir, dir)).isDirectory()) {
-            processList.add(dir);
-          }
-        } catch (SecurityException s) {
-          // skip this process
-        }
+    List<String> processList = Collections.emptyList();
+    FileFilter procListFileFilter = new AndFileFilter(
+        DirectoryFileFilter.INSTANCE, new RegexFileFilter(numberPattern));
+
+    File dir = new File(procfsDir);
+    File[] processDirs = dir.listFiles(procListFileFilter);
+
+    if (ArrayUtils.isNotEmpty(processDirs)) {
+      processList = new ArrayList<String>(processDirs.length);
+      for (File processDir : processDirs) {
+        processList.add(processDir.getName());
       }
     }
     return processList;
@@ -547,7 +546,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
         ret = null;
       }
     } catch (IOException io) {
-      LOG.warn("Error reading the stream " + io);
+      LOG.warn("Error reading the stream", io);
       ret = null;
     } finally {
       // Close the streams
@@ -556,10 +555,10 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
         try {
           in.close();
         } catch (IOException i) {
-          LOG.warn("Error closing the stream " + in);
+          LOG.warn("Error closing the stream", i);
         }
       } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
+        LOG.warn("Error closing the stream", i);
       }
     }
 
@@ -729,14 +728,14 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
           ret = "N/A";
         } else {
           ret = ret.replace('\0', ' '); // Replace each null char with a space
-          if (ret.equals("")) {
+          if (ret.isEmpty()) {
             // The cmdline might be empty because the process is swapped out or
             // is a zombie.
             ret = "N/A";
           }
         }
       } catch (IOException io) {
-        LOG.warn("Error reading the stream " + io);
+        LOG.warn("Error reading the stream", io);
         ret = "N/A";
       } finally {
         // Close the streams
@@ -745,10 +744,10 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
           try {
             in.close();
           } catch (IOException i) {
-            LOG.warn("Error closing the stream " + in);
+            LOG.warn("Error closing the stream", i);
           }
         } catch (IOException i) {
-          LOG.warn("Error closing the stream " + fReader);
+          LOG.warn("Error closing the stream", i);
         }
       }
 
@@ -805,11 +804,11 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
         }
       }
     } catch (FileNotFoundException f) {
-      LOG.error(f.getMessage());
+      LOG.error(f);
     } catch (IOException e) {
-      LOG.error(e.getMessage());
+      LOG.error(e);
     } catch (Throwable t) {
-      LOG.error(t.getMessage());
+      LOG.error(t);
     } finally {
       IOUtils.closeQuietly(in);
     }
@@ -839,7 +838,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
       StringBuilder sb = new StringBuilder();
       for (ProcessSmapMemoryInfo info : memoryInfoList) {
         sb.append("\n");
-        sb.append(info.toString());
+        sb.append(info);
       }
       return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/626b5103/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
index 43a5182..215e5b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
@@ -118,7 +118,6 @@ public class TestProcfsBasedProcessTree {
   }
 
   @Test(timeout = 30000)
-  @SuppressWarnings("deprecation")
   public void testProcessTree() throws Exception {
     try {
       Assert.assertTrue(ProcfsBasedProcessTree.isAvailable());
@@ -163,7 +162,7 @@ public class TestProcfsBasedProcessTree {
     LOG.info("Root process pid: " + pid);
     ProcfsBasedProcessTree p = createProcessTree(pid);
     p.updateProcessTree(); // initialize
-    LOG.info("ProcessTree: " + p.toString());
+    LOG.info("ProcessTree: " + p);
 
     File leaf = new File(lowestDescendant);
     // wait till lowest descendant process of Rougue Task starts execution
@@ -176,7 +175,7 @@ public class TestProcfsBasedProcessTree {
     }
 
     p.updateProcessTree(); // reconstruct
-    LOG.info("ProcessTree: " + p.toString());
+    LOG.info("ProcessTree: " + p);
 
     // Verify the orphaned pid is In process tree
     String lostpid = getPidFromPidFile(lostDescendant);
@@ -395,7 +394,6 @@ public class TestProcfsBasedProcessTree {
    *           files.
    */
   @Test(timeout = 30000)
-  @SuppressWarnings("deprecation")
   public void testCpuAndMemoryForProcessTree() throws IOException {
 
     // test processes
@@ -908,13 +906,8 @@ public class TestProcfsBasedProcessTree {
       throws IOException {
     for (String pid : pids) {
       File pidDir = new File(procfsRootDir, pid);
-      pidDir.mkdir();
-      if (!pidDir.exists()) {
-        throw new IOException("couldn't make process directory under "
-            + "fake procfs");
-      } else {
-        LOG.info("created pid dir");
-      }
+      FileUtils.forceMkdir(pidDir);
+      LOG.info("created pid dir: " + pidDir);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[02/23] hadoop git commit: HDFS-12351. Explicitly describe the minimal number of DataNodes required to support an EC policy in EC document.. Contributed by Hanisha Koneru.

Posted by ae...@apache.org.
HDFS-12351. Explicitly describe the minimal number of DataNodes required to support an EC policy in EC document.. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b4d11337
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b4d11337
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b4d11337

Branch: refs/heads/HDFS-7240
Commit: b4d11337c9271f29dd403d2393812f2ab6f35b35
Parents: dfe0cd8
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Jan 2 12:54:40 2018 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Jan 2 12:54:40 2018 -0800

----------------------------------------------------------------------
 .../hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md           | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4d11337/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
index 4459c94..60fd3ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
@@ -99,6 +99,10 @@ Deployment
 
   Encoding and decoding work consumes additional CPU on both HDFS clients and DataNodes.
 
+  Erasure coding requires a minimum of as many DataNodes in the cluster as
+  the configured EC stripe width. For EC policy RS (6,3), this means
+  a minimum of 9 DataNodes.
+
   Erasure coded files are also spread across racks for rack fault-tolerance.
   This means that when reading and writing striped files, most operations are off-rack.
   Network bisection bandwidth is thus very important.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[09/23] hadoop git commit: YARN-6894. RM Apps API returns only active apps when query parameter queue used. Contributed by Gergely Novák.

Posted by ae...@apache.org.
YARN-6894. RM Apps API returns only active apps when query parameter queue used. Contributed by Gergely Novák.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/80440231
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/80440231
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/80440231

Branch: refs/heads/HDFS-7240
Commit: 80440231d49e518ab6411367d7d8474155ecca2b
Parents: 626b510
Author: Miklos Szegedi <sz...@apache.org>
Authored: Tue Jan 2 17:59:10 2018 -0800
Committer: Miklos Szegedi <sz...@apache.org>
Committed: Tue Jan 2 17:59:10 2018 -0800

----------------------------------------------------------------------
 .../hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/80440231/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
index f478403..09e4727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
@@ -1316,13 +1316,13 @@ With the Applications API, you can obtain a collection of resources, each of whi
 
 ### Query Parameters Supported
 
-Multiple parameters can be specified for GET operations. The started and finished times have a begin and end parameter to allow you to specify ranges. For example, one could request all applications that started between 1:00am and 2:00pm on 12/19/2011 with startedTimeBegin=1324256400&startedTimeEnd=1324303200. If the Begin parameter is not specified, it defaults to 0, and if the End parameter is not specified, it defaults to infinity.
+Multiple parameters can be specified for GET operations. The started and finished times have a begin and end parameter to allow you to specify ranges. For example, one could request all applications that started between 1:00am and 2:00pm on 12/19/2011 with startedTimeBegin=1324256400&startedTimeEnd=1324303200. If the Begin parameter is not specified, it defaults to 0, and if the End parameter is not specified, it defaults to infinity. All query parameters for this api will filter on all applications. However the `queue` query parameter will only implicitly filter on unfinished applications that are currently in the given queue.
 
       * state [deprecated] - state of the application
       * states - applications matching the given application states, specified as a comma-separated list.
       * finalStatus - the final status of the application - reported by the application itself
       * user - user name
-      * queue - queue name
+      * queue - unfinished applications that are currently in this queue
       * limit - total number of app objects to be returned
       * startedTimeBegin - applications with start time beginning with this time, specified in ms since epoch
       * startedTimeEnd - applications with start time ending with this time, specified in ms since epoch


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[22/23] hadoop git commit: HDFS-12860. StripedBlockUtil#getRangesInternalBlocks throws exception for the block group size larger than 2GB. (Contributed by Lei (Eddy) Xu)

Posted by ae...@apache.org.
HDFS-12860. StripedBlockUtil#getRangesInternalBlocks throws exception for the block group size larger than 2GB. (Contributed by Lei (Eddy) Xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dc735b28
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dc735b28
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dc735b28

Branch: refs/heads/HDFS-7240
Commit: dc735b286bb656903df49aee776d22ee0c61f860
Parents: 739d3c3
Author: Lei Xu <le...@apache.org>
Authored: Thu Jan 4 10:16:40 2018 -0800
Committer: Lei Xu <le...@apache.org>
Committed: Thu Jan 4 10:54:56 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 63 +++++++++++++++-----
 .../hadoop/hdfs/util/TestStripedBlockUtil.java  | 41 ++++++++++++-
 2 files changed, 86 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc735b28/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 9e24576..9bad45d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -396,7 +396,9 @@ public class StripedBlockUtil {
       long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
     Preconditions.checkArgument(
         rangeStartInBlockGroup <= rangeEndInBlockGroup &&
-            rangeEndInBlockGroup < blockGroup.getBlockSize());
+            rangeEndInBlockGroup < blockGroup.getBlockSize(),
+        "start=%s end=%s blockSize=%s", rangeStartInBlockGroup,
+        rangeEndInBlockGroup, blockGroup.getBlockSize());
     long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1;
     int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
     int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
@@ -578,28 +580,39 @@ public class StripedBlockUtil {
   public static class StripingCell {
     final ErasureCodingPolicy ecPolicy;
     /** Logical order in a block group, used when doing I/O to a block group. */
-    final int idxInBlkGroup;
-    final int idxInInternalBlk;
-    final int idxInStripe;
+    private final long idxInBlkGroup;
+    private final long idxInInternalBlk;
+    private final int idxInStripe;
     /**
      * When a logical byte range is mapped to a set of cells, it might
      * partially overlap with the first and last cells. This field and the
      * {@link #size} variable represent the start offset and size of the
      * overlap.
      */
-    final int offset;
-    final int size;
+    private final long offset;
+    private final int size;
 
-    StripingCell(ErasureCodingPolicy ecPolicy, int cellSize, int idxInBlkGroup,
-        int offset) {
+    StripingCell(ErasureCodingPolicy ecPolicy, int cellSize, long idxInBlkGroup,
+        long offset) {
       this.ecPolicy = ecPolicy;
       this.idxInBlkGroup = idxInBlkGroup;
       this.idxInInternalBlk = idxInBlkGroup / ecPolicy.getNumDataUnits();
-      this.idxInStripe = idxInBlkGroup -
-          this.idxInInternalBlk * ecPolicy.getNumDataUnits();
+      this.idxInStripe = (int)(idxInBlkGroup -
+          this.idxInInternalBlk * ecPolicy.getNumDataUnits());
       this.offset = offset;
       this.size = cellSize;
     }
+
+    int getIdxInStripe() {
+      return idxInStripe;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("StripingCell(idxInBlkGroup=%d, " +
+          "idxInInternalBlk=%d, idxInStrip=%d, offset=%d, size=%d)",
+          idxInBlkGroup, idxInInternalBlk, idxInStripe, offset, size);
+    }
   }
 
   /**
@@ -646,7 +659,9 @@ public class StripedBlockUtil {
     public int missingChunksNum = 0;
 
     public AlignedStripe(long offsetInBlock, long length, int width) {
-      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0,
+          "OffsetInBlock(%s) and length(%s) must be non-negative",
+          offsetInBlock, length);
       this.range = new VerticalRange(offsetInBlock, length);
       this.chunks = new StripingChunk[width];
     }
@@ -665,9 +680,9 @@ public class StripedBlockUtil {
 
     @Override
     public String toString() {
-      return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock +
-          ", fetchedChunksNum=" + fetchedChunksNum +
-          ", missingChunksNum=" + missingChunksNum;
+      return "AlignedStripe(Offset=" + range.offsetInBlock + ", length=" +
+          range.spanInBlock + ", fetchedChunksNum=" + fetchedChunksNum +
+          ", missingChunksNum=" + missingChunksNum + ")";
     }
   }
 
@@ -698,7 +713,9 @@ public class StripedBlockUtil {
     public long spanInBlock;
 
     public VerticalRange(long offsetInBlock, long length) {
-      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0,
+          "OffsetInBlock(%s) and length(%s) must be non-negative",
+          offsetInBlock, length);
       this.offsetInBlock = offsetInBlock;
       this.spanInBlock = length;
     }
@@ -707,6 +724,12 @@ public class StripedBlockUtil {
     public boolean include(long pos) {
       return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
     }
+
+    @Override
+    public String toString() {
+      return String.format("VerticalRange(offsetInBlock=%d, spanInBlock=%d)",
+          this.offsetInBlock, this.spanInBlock);
+    }
   }
 
   /**
@@ -880,7 +903,9 @@ public class StripedBlockUtil {
     final long length;
 
     public StripeRange(long offsetInBlock, long length) {
-      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0,
+          "Offset(%s) and length(%s) must be non-negative", offsetInBlock,
+          length);
       this.offsetInBlock = offsetInBlock;
       this.length = length;
     }
@@ -892,6 +917,12 @@ public class StripedBlockUtil {
     public long getLength() {
       return length;
     }
+
+    @Override
+    public String toString() {
+      return String.format("StripeRange(offsetInBlock=%d, length=%d)",
+          offsetInBlock, length);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc735b28/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
index 5de63eb..fe6bb09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -40,6 +40,7 @@ import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Need to cover the following combinations:
@@ -127,7 +128,7 @@ public class TestStripedBlockUtil {
     return (byte) (((i + 13) * 29) & BYTE_MASK);
   }
 
-  private LocatedStripedBlock createDummyLocatedBlock(int bgSize) {
+  private LocatedStripedBlock createDummyLocatedBlock(long bgSize) {
     final long blockGroupID = -1048576;
     DatanodeInfo[] locs = new DatanodeInfo[groupSize];
     String[] storageIDs = new String[groupSize];
@@ -160,7 +161,7 @@ public class TestStripedBlockUtil {
       Preconditions.checkState(done % cellSize == 0);
       StripingCell cell =
           new StripingCell(ecPolicy, cellSize, done / cellSize, 0);
-      int idxInStripe = cell.idxInStripe;
+      int idxInStripe = cell.getIdxInStripe();
       int size = Math.min(cellSize, bgSize - done);
       for (int i = 0; i < size; i++) {
         bufs[idxInStripe][pos[idxInStripe] + i] = hashIntToByte(done + i);
@@ -283,4 +284,40 @@ public class TestStripedBlockUtil {
       }
     }
   }
+
+  /**
+   * Test dividing a byte range that located above the 2GB range, which is
+   * {@link Integer#MAX_VALUE}.
+   *
+   * HDFS-12860 occurs when {@link VerticalRange#offsetInBlock} is larger than
+   * {@link Integer#MAX_VALUE}
+   *
+   * Take RS-6-3-1024k EC policy as example:
+   *  <li>cellSize = 1MB</li>
+   *  <li>The first {@link VerticalRange#offsetInBlock} that is larger than
+   *  {@link Integer#MAX_VALUE} is Math.ceilInteger.MAX_VALUE / cellSize = 2048
+   *  </li>
+   *  <li>The first offset in block group that causes HDFS-12860 is:
+   *  2048 * cellSize * dataBlocks (6)</li>
+   */
+  @Test
+  public void testDivideOneStripeLargeBlockSize() {
+    ByteBuffer buffer = ByteBuffer.allocate(stripeSize);
+
+    // This offset will cause overflow before HDFS-12860.
+    long offsetInInternalBlk = Integer.MAX_VALUE / cellSize + 10;
+    long rangeStartInBlockGroup = offsetInInternalBlk * dataBlocks * cellSize;
+    long rangeEndInBlockGroup = rangeStartInBlockGroup +
+        dataBlocks / 2 * cellSize - 1;
+    // each block is 4GB, each block group has 4GB * (6 + 3) = 36GB.
+    long blockGroupSize = 4096L * cellSize * groupSize;
+    LocatedStripedBlock blockGroup = createDummyLocatedBlock(blockGroupSize);
+    AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
+        cellSize, blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup,
+        buffer);
+    long offset = offsetInInternalBlk * cellSize;
+    assertTrue(offset > Integer.MAX_VALUE);
+    assertEquals(offset, stripes[0].range.offsetInBlock);
+    assertEquals(1, stripes.length);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[19/23] hadoop git commit: HDFS-12913. TestDNFencingWithReplication sometimes fails because of NameNode in standby state. Contributed by Zsolt Venczel.

Posted by ae...@apache.org.
HDFS-12913. TestDNFencingWithReplication sometimes fails because of NameNode in standby state. Contributed by Zsolt Venczel.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45a47198
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45a47198
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45a47198

Branch: refs/heads/HDFS-7240
Commit: 45a47198b43a96817679d5bf9e570d405555e3eb
Parents: 2a48b35
Author: Sean Mackrory <ma...@apache.org>
Authored: Thu Jan 4 08:22:01 2018 -0700
Committer: Sean Mackrory <ma...@apache.org>
Committed: Thu Jan 4 08:37:21 2018 -0700

----------------------------------------------------------------------
 .../namenode/ha/TestDNFencingWithReplication.java       | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/45a47198/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
index c91d4de..ee00da4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
@@ -60,11 +60,14 @@ public class TestDNFencingWithReplication {
   private static class ReplicationToggler extends RepeatingTestThread {
     private final FileSystem fs;
     private final Path path;
+    private final MiniDFSCluster cluster;
 
-    public ReplicationToggler(TestContext ctx, FileSystem fs, Path p) {
+    ReplicationToggler(TestContext ctx, FileSystem fs, Path p,
+                       MiniDFSCluster cluster) {
       super(ctx);
       this.fs = fs;
       this.path = p;
+      this.cluster = cluster;
     }
 
     @Override
@@ -81,6 +84,7 @@ public class TestDNFencingWithReplication {
           @Override
           public Boolean get() {
             try {
+              cluster.waitActive();
               BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, 10);
               Assert.assertEquals(1, blocks.length);
               return blocks[0].getHosts().length == replicas;
@@ -90,8 +94,8 @@ public class TestDNFencingWithReplication {
           }
         }, 100, 60000);
       } catch (TimeoutException te) {
-        throw new IOException("Timed out waiting for " + replicas + " replicas " +
-            "on path " + path);
+        throw new IOException("Timed out waiting for " + replicas +
+                " replicas on path " + path);
       }
     }
     
@@ -122,7 +126,7 @@ public class TestDNFencingWithReplication {
       for (int i = 0; i < NUM_THREADS; i++) {
         Path p = new Path("/test-" + i);
         DFSTestUtil.createFile(fs, p, BLOCK_SIZE*10, (short)3, (long)i);
-        togglers.addThread(new ReplicationToggler(togglers, fs, p));
+        togglers.addThread(new ReplicationToggler(togglers, fs, p, cluster));
       }
       
       // Start a separate thread which will make sure that replication


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[10/23] hadoop git commit: YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg.

Posted by ae...@apache.org.
YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f515f57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f515f57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f515f57

Branch: refs/heads/HDFS-7240
Commit: 7f515f57ede74dae787994f37bfafd5d20c9aa4c
Parents: 8044023
Author: Miklos Szegedi <sz...@apache.org>
Authored: Tue Jan 2 18:03:04 2018 -0800
Committer: Miklos Szegedi <sz...@apache.org>
Committed: Tue Jan 2 18:03:04 2018 -0800

----------------------------------------------------------------------
 .../yarn/server/nodemanager/NodeManager.java    |  1 +
 .../recovery/NMLeveldbStateStoreService.java    | 72 ++++++++++++++++++++
 .../recovery/NMStateStoreService.java           | 11 +++
 .../TestNMLeveldbStateStoreService.java         | 35 ++++++++++
 4 files changed, 119 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 179b01e..6cb8560 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -451,6 +451,7 @@ public class NodeManager extends CompositeService
     // so that we make sure everything is up before registering with RM. 
     addService(nodeStatusUpdater);
     ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
+    nmStore.setNodeStatusUpdater(nodeStatusUpdater);
 
     super.serviceInit(conf);
     // TODO add local dirs to del

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 3455874..0f659d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import org.apache.hadoop.yarn.server.records.Version;
@@ -155,6 +156,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   private DB db;
   private boolean isNewlyCreated;
+  private boolean isHealthy;
   private Timer compactionTimer;
 
   /**
@@ -169,6 +171,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   @Override
   protected void startStorage() throws IOException {
+    // Assume that we're healthy when we start
+    isHealthy = true;
   }
 
   @Override
@@ -187,6 +191,36 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     return isNewlyCreated;
   }
 
+  /**
+   * If the state store throws an error after recovery has been performed
+   * then we can not trust it any more to reflect the NM state. We need to
+   * mark the store and node unhealthy.
+   * Errors during the recovery will cause a service failure and thus a NM
+   * start failure. Do not need to mark the store unhealthy for those.
+   * @param dbErr Exception
+   */
+  private void markStoreUnHealthy(DBException dbErr) {
+    // Always log the error here, we might not see the error in the caller
+    LOG.error("Statestore exception: ", dbErr);
+    // We have already been marked unhealthy so no need to do it again.
+    if (!isHealthy) {
+      return;
+    }
+    // Mark unhealthy, an out of band heartbeat will be sent and the state
+    // will remain unhealthy (not recoverable).
+    // No need to close the store: does not make any difference at this point.
+    isHealthy = false;
+    // We could get here before the nodeStatusUpdater is set
+    NodeStatusUpdater nsu = getNodeStatusUpdater();
+    if (nsu != null) {
+      nsu.reportException(dbErr);
+    }
+  }
+
+  @VisibleForTesting
+  boolean isHealthy() {
+    return isHealthy;
+  }
 
   @Override
   public List<RecoveredContainerState> loadContainersState()
@@ -354,6 +388,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -378,6 +413,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -393,6 +429,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -408,6 +445,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -424,6 +462,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -441,6 +480,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(diagnostics.toString()));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -459,6 +499,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -488,6 +529,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -504,6 +546,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -520,6 +563,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(Integer.toString(exitCode)));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -532,6 +576,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -544,6 +589,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(workDir));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -556,6 +602,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(logDir));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -589,6 +636,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -638,6 +686,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), p.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -659,6 +708,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -815,6 +865,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), proto.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -838,6 +889,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -861,6 +913,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -926,6 +979,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), taskProto.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -936,6 +990,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1009,6 +1064,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1023,6 +1079,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(dbKey), pb.getProto().toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1096,6 +1153,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(expTime.toString()));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1107,6 +1165,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1157,6 +1216,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), proto.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1167,6 +1227,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1198,6 +1259,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
 
@@ -1361,6 +1423,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
       try {
         db.delete(bytes(dbkey));
       } catch (DBException e) {
+        markStoreUnHealthy(e);
         throw new IOException(e);
       }
       return;
@@ -1375,6 +1438,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(fullkey), data);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1386,6 +1450,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(fullkey));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1409,6 +1474,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         candidates.add(key);
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     } finally {
       if (iter != null) {
@@ -1422,6 +1488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         db.delete(bytes(key));
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1555,6 +1622,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     return db;
   }
 
+  @VisibleForTesting
+  void setDB(DB testDb) {
+    this.db = testDb;
+  }
+
   /**
    * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
    * 2) Any incompatible change of state-store is a major upgrade, and any

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 598ea9e..f9b86bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 
@@ -51,10 +52,20 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Reso
 @Unstable
 public abstract class NMStateStoreService extends AbstractService {
 
+  private NodeStatusUpdater nodeStatusUpdater = null;
+
   public NMStateStoreService(String name) {
     super(name);
   }
 
+  protected NodeStatusUpdater getNodeStatusUpdater() {
+    return nodeStatusUpdater;
+  }
+
+  public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
+    this.nodeStatusUpdater = nodeStatusUpdater;
+  }
+
   public static class RecoveredApplicationsState {
     List<ContainerManagerApplicationProto> applications;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 3cac5b4..de667d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestNMLeveldbStateStoreService {
   private static final File TMP_DIR = new File(
@@ -1165,6 +1168,38 @@ public class TestNMLeveldbStateStoreService {
         resourceMappings.getAssignedResources("numa").equals(numaRes));
   }
 
+  @Test
+  public void testStateStoreNodeHealth() throws IOException {
+    // keep the working DB clean, break a temp DB
+    DB keepDB = stateStore.getDB();
+    DB myMocked = mock(DB.class);
+    stateStore.setDB(myMocked);
+
+    ApplicationId appId = ApplicationId.newInstance(1234, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    DBException toThrow = new DBException();
+    Mockito.doThrow(toThrow).when(myMocked).
+        put(any(byte[].class), any(byte[].class));
+    // write some data
+    try {
+      // chosen a simple method could be any of the "void" methods
+      ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+      stateStore.storeContainerKilled(containerId);
+    } catch (IOException ioErr) {
+      // Cause should be wrapped DBException
+      assertTrue(ioErr.getCause() instanceof DBException);
+      // check the store is marked unhealthy
+      assertFalse("Statestore should have been unhealthy",
+          stateStore.isHealthy());
+      return;
+    } finally {
+      // restore the working DB
+      stateStore.setDB(keepDB);
+    }
+    Assert.fail("Expected exception not thrown");
+  }
+
   private StartContainerRequest storeMockContainer(ContainerId containerId)
       throws IOException {
     // create a container request


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[17/23] hadoop git commit: YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter)

Posted by ae...@apache.org.
YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7a550448
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7a550448
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7a550448

Branch: refs/heads/HDFS-7240
Commit: 7a550448036c9d140d2c35c684cc8023ceb8880e
Parents: 3ba9859
Author: Robert Kanter <rk...@apache.org>
Authored: Wed Jan 3 15:31:50 2018 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed Jan 3 15:31:50 2018 -0800

----------------------------------------------------------------------
 .../fair/AllocationFileLoaderService.java       | 104 +++++++++++--------
 .../fair/TestAllocationFileLoaderService.java   |  92 ++++++++++++----
 2 files changed, 133 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a550448/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 597af94..f73e05f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -17,25 +17,15 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -45,8 +35,8 @@ import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.Permission;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -57,7 +47,17 @@ import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
-import com.google.common.annotations.VisibleForTesting;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 @Public
 @Unstable
@@ -77,6 +77,9 @@ public class AllocationFileLoaderService extends AbstractService {
 
   public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
+  //Permitted allocation file filesystems (case insensitive)
+  private static final String SUPPORTED_FS_REGEX =
+      "(?i)(hdfs)|(file)|(s3a)|(viewfs)";
   private static final String ROOT = "root";
   private static final AccessControlList EVERYBODY_ACL =
       new AccessControlList("*");
@@ -85,12 +88,14 @@ public class AllocationFileLoaderService extends AbstractService {
 
   private final Clock clock;
 
-  private long lastSuccessfulReload; // Last time we successfully reloaded queues
-  private boolean lastReloadAttemptFailed = false;
-  
-  // Path to XML file containing allocations. 
-  private File allocFile;
-  
+  // Last time we successfully reloaded queues
+  private volatile long lastSuccessfulReload;
+  private volatile boolean lastReloadAttemptFailed = false;
+
+  // Path to XML file containing allocations.
+  private Path allocFile;
+  private FileSystem fs;
+
   private Listener reloadListener;
   
   @VisibleForTesting
@@ -108,19 +113,19 @@ public class AllocationFileLoaderService extends AbstractService {
   public AllocationFileLoaderService(Clock clock) {
     super(AllocationFileLoaderService.class.getName());
     this.clock = clock;
-    
   }
   
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     this.allocFile = getAllocationFile(conf);
-    if (allocFile != null) {
-      reloadThread = new Thread() {
-        @Override
-        public void run() {
-          while (running) {
+    if(this.allocFile != null) {
+      this.fs = allocFile.getFileSystem(conf);
+      reloadThread = new Thread(() -> {
+        while (running) {
+          try {
             long time = clock.getTime();
-            long lastModified = allocFile.lastModified();
+            long lastModified =
+                fs.getFileStatus(allocFile).getModificationTime();
             if (lastModified > lastSuccessfulReload &&
                 time > lastModified + ALLOC_RELOAD_WAIT_MS) {
               try {
@@ -136,19 +141,21 @@ public class AllocationFileLoaderService extends AbstractService {
               if (!lastReloadAttemptFailed) {
                 LOG.warn("Failed to reload fair scheduler config file because" +
                     " last modified returned 0. File exists: "
-                    + allocFile.exists());
+                    + fs.exists(allocFile));
               }
               lastReloadAttemptFailed = true;
             }
-            try {
-              Thread.sleep(reloadIntervalMs);
-            } catch (InterruptedException ex) {
-              LOG.info(
-                  "Interrupted while waiting to reload alloc configuration");
-            }
+          } catch (IOException e) {
+            LOG.info("Exception while loading allocation file: " + e);
+          }
+          try {
+            Thread.sleep(reloadIntervalMs);
+          } catch (InterruptedException ex) {
+            LOG.info(
+                "Interrupted while waiting to reload alloc configuration");
           }
         }
-      };
+      });
       reloadThread.setName("AllocationFileReloader");
       reloadThread.setDaemon(true);
     }
@@ -182,24 +189,31 @@ public class AllocationFileLoaderService extends AbstractService {
    * path is relative, it is searched for in the
    * classpath, but loaded like a regular File.
    */
-  public File getAllocationFile(Configuration conf) {
+  public Path getAllocationFile(Configuration conf)
+      throws UnsupportedFileSystemException {
     String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
         FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
-    File allocFile = new File(allocFilePath);
-    if (!allocFile.isAbsolute()) {
+    Path allocPath = new Path(allocFilePath);
+    String allocPathScheme = allocPath.toUri().getScheme();
+    if(allocPathScheme != null && !allocPathScheme.matches(SUPPORTED_FS_REGEX)){
+      throw new UnsupportedFileSystemException("Allocation file "
+          + allocFilePath + " uses an unsupported filesystem");
+    } else if (!allocPath.isAbsolute()) {
       URL url = Thread.currentThread().getContextClassLoader()
           .getResource(allocFilePath);
       if (url == null) {
         LOG.warn(allocFilePath + " not found on the classpath.");
-        allocFile = null;
+        allocPath = null;
       } else if (!url.getProtocol().equalsIgnoreCase("file")) {
         throw new RuntimeException("Allocation file " + url
             + " found on the classpath is not on the local filesystem.");
       } else {
-        allocFile = new File(url.getPath());
+        allocPath = new Path(url.getProtocol(), null, url.getPath());
       }
+    } else if (allocPath.isAbsoluteAndSchemeAuthorityNull()){
+      allocPath = new Path("file", null, allocFilePath);
     }
-    return allocFile;
+    return allocPath;
   }
   
   public synchronized void setReloadListener(Listener reloadListener) {
@@ -274,7 +288,7 @@ public class AllocationFileLoaderService extends AbstractService {
       DocumentBuilderFactory.newInstance();
     docBuilderFactory.setIgnoringComments(true);
     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc = builder.parse(allocFile);
+    Document doc = builder.parse(fs.open(allocFile));
     Element root = doc.getDocumentElement();
     if (!"allocations".equals(root.getTagName()))
       throw new AllocationConfigurationException("Bad fair scheduler config " +
@@ -437,7 +451,7 @@ public class AllocationFileLoaderService extends AbstractService {
           fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
           reservationAcls, newPlacementPolicy, configuredQueues,
           globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
-    
+
     lastSuccessfulReload = clock.getTime();
     lastReloadAttemptFailed = false;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a550448/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 67b46f9..c46ecd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -17,17 +17,12 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
@@ -38,6 +33,23 @@ import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TestAllocationFileLoaderService {
   
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -45,16 +57,60 @@ public class TestAllocationFileLoaderService {
 
   final static String ALLOC_FILE = new File(TEST_DIR,
       "test-queues").getAbsolutePath();
-  
+  private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
+
   @Test
-  public void testGetAllocationFileFromClasspath() {
-    Configuration conf = new Configuration();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
-        "test-fair-scheduler.xml");
+  public void testGetAllocationFileFromFileSystem()
+      throws IOException, URISyntaxException {
+    Configuration conf = new YarnConfiguration();
+    File baseDir =
+        new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
+    FileUtil.fullyDelete(baseDir);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    MiniDFSCluster hdfsCluster = builder.build();
+    String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort()
+        + Path.SEPARATOR + TEST_FAIRSCHED_XML;
+
+    URL fschedURL = Thread.currentThread().getContextClassLoader()
+        .getResource(TEST_FAIRSCHED_XML);
+    FileSystem fs = FileSystem.get(conf);
+    fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
+
     AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
-    File allocationFile = allocLoader.getAllocationFile(conf);
-    assertEquals("test-fair-scheduler.xml", allocationFile.getName());
-    assertTrue(allocationFile.exists());
+    Path allocationFile = allocLoader.getAllocationFile(conf);
+    assertEquals(fsAllocPath, allocationFile.toString());
+    assertTrue(fs.exists(allocationFile));
+
+    hdfsCluster.shutdown(true);
+  }
+
+  @Test (expected = UnsupportedFileSystemException.class)
+  public void testDenyGetAllocationFileFromUnsupportedFileSystem()
+      throws UnsupportedFileSystemException {
+    Configuration conf = new YarnConfiguration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
+    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+
+    allocLoader.getAllocationFile(conf);
+  }
+
+  @Test
+  public void testGetAllocationFileFromClasspath() {
+    try {
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+          TEST_FAIRSCHED_XML);
+      AllocationFileLoaderService allocLoader =
+          new AllocationFileLoaderService();
+      Path allocationFile = allocLoader.getAllocationFile(conf);
+      assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
+      assertTrue(fs.exists(allocationFile));
+    } catch (IOException e) {
+      fail("Unable to access allocation file from classpath: " + e);
+    }
   }
   
   @Test (timeout = 10000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[12/23] hadoop git commit: YARN-7692. Skip validating priority acls while recovering applications. Contributed by Sunil G.

Posted by ae...@apache.org.
YARN-7692. Skip validating priority acls while recovering applications. Contributed by Sunil G.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c9bf813c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c9bf813c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c9bf813c

Branch: refs/heads/HDFS-7240
Commit: c9bf813c9a6c018d14f2bef49ba086ec0e60c761
Parents: c0c7cce
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Wed Jan 3 18:20:04 2018 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Wed Jan 3 18:20:04 2018 +0530

----------------------------------------------------------------------
 .../yarn/server/resourcemanager/RMAppManager.java     | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9bf813c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 5ea1152..2983077 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -393,10 +393,16 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     // Verify and get the update application priority and set back to
     // submissionContext
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
-    Priority appPriority = scheduler.checkAndGetApplicationPriority(
-        submissionContext.getPriority(), userUgi, submissionContext.getQueue(),
-        applicationId);
-    submissionContext.setPriority(appPriority);
+
+    // Application priority needed to be validated only while submitting. During
+    // recovery, validated priority could be recovered from submission context.
+    if (!isRecovery) {
+      Priority appPriority = scheduler.checkAndGetApplicationPriority(
+          submissionContext.getPriority(), userUgi,
+          submissionContext.getQueue(),
+          applicationId);
+      submissionContext.setPriority(appPriority);
+    }
 
     // Since FairScheduler queue mapping is done inside scheduler,
     // if FairScheduler is used and the queue doesn't exist, we should not


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[21/23] hadoop git commit: HDFS-12987. Document - Disabling the Lazy persist file scrubber.. Contributed by Karthik Palanisamy.

Posted by ae...@apache.org.
HDFS-12987. Document - Disabling the Lazy persist file scrubber.. Contributed by Karthik Palanisamy.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/739d3c39
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/739d3c39
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/739d3c39

Branch: refs/heads/HDFS-7240
Commit: 739d3c394d772783fe23b386274d58954c9a0236
Parents: d795661
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Jan 4 10:32:00 2018 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jan 4 10:32:00 2018 -0800

----------------------------------------------------------------------
 .../hadoop-hdfs/src/main/resources/hdfs-default.xml              | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/739d3c39/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7b5ccbc..831cda8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -475,8 +475,8 @@
   <description>
     The NameNode periodically scans the namespace for LazyPersist files with
     missing blocks and unlinks them from the namespace. This configuration key
-    controls the interval between successive scans. Set it to a negative value
-    to disable this behavior.
+    controls the interval between successive scans. If this value is set to 0,
+    the file scrubber is disabled.
   </description>
 </property>
 <property>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[03/23] hadoop git commit: HDFS-11847. Enhance dfsadmin listOpenFiles command to list files blocking datanode decommissioning.

Posted by ae...@apache.org.
HDFS-11847. Enhance dfsadmin listOpenFiles command to list files blocking datanode decommissioning.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42a1c985
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42a1c985
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42a1c985

Branch: refs/heads/HDFS-7240
Commit: 42a1c98597e6dba2e371510a6b2b6b1fb94e4090
Parents: b4d1133
Author: Manoj Govindassamy <ma...@apache.org>
Authored: Tue Jan 2 14:59:36 2018 -0800
Committer: Manoj Govindassamy <ma...@apache.org>
Committed: Tue Jan 2 14:59:36 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  16 +-
 .../hadoop/hdfs/DistributedFileSystem.java      |   8 +
 .../apache/hadoop/hdfs/client/HdfsAdmin.java    |   7 +
 .../hadoop/hdfs/protocol/ClientProtocol.java    |  16 ++
 .../hadoop/hdfs/protocol/OpenFilesIterator.java |  36 +++-
 .../ClientNamenodeProtocolTranslatorPB.java     |  18 +-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  24 +++
 .../src/main/proto/ClientNamenodeProtocol.proto |   7 +
 ...tNamenodeProtocolServerSideTranslatorPB.java |   7 +-
 .../server/blockmanagement/BlockManager.java    |   2 +-
 .../blockmanagement/DatanodeAdminManager.java   |  25 ++-
 .../blockmanagement/DatanodeDescriptor.java     |  24 ++-
 .../federation/router/RouterRpcServer.java      |  10 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  49 ++++-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  10 +-
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  |  36 +++-
 .../src/site/markdown/HDFSCommands.md           |   2 +-
 .../apache/hadoop/hdfs/AdminStatesBaseTest.java |  18 +-
 .../apache/hadoop/hdfs/TestDecommission.java    | 177 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/TestHdfsAdmin.java   |   4 +-
 .../blockmanagement/BlockManagerTestUtil.java   |  12 +-
 .../hdfs/server/namenode/TestLeaseManager.java  |  48 ++---
 .../hdfs/server/namenode/TestListOpenFiles.java |  27 ++-
 23 files changed, 520 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 59f553b..c774132 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -133,6 +133,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -3084,8 +3085,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    *
    * @throws IOException
    */
+  @Deprecated
   public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
     checkOpen();
-    return new OpenFilesIterator(namenode, tracer);
+    return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  /**
+   * Get a remote iterator to the open files list by type, managed by NameNode.
+   *
+   * @param openFilesTypes
+   * @throws IOException
+   */
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    checkOpen();
+    return new OpenFilesIterator(namenode, tracer, openFilesTypes);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 6b0c57a..85e5964 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -3079,10 +3080,17 @@ public class DistributedFileSystem extends FileSystem
    * <p/>
    * This method can only be called by HDFS superusers.
    */
+  @Deprecated
   public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
     return dfs.listOpenFiles();
   }
 
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    return dfs.listOpenFiles(openFilesTypes);
+  }
+
+
   /**
    * Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index 9116167..e620039 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -652,8 +653,14 @@ public class HdfsAdmin {
    * <p/>
    * This method can only be called by HDFS superusers.
    */
+  @Deprecated
   public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
     return dfs.listOpenFiles();
   }
 
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    return dfs.listOpenFiles(openFilesTypes);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index e8a33dd..38c242a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -1713,5 +1714,20 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
+  @Deprecated
   BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
+
+  /**
+   * List open files in the system in batches. INode id is the cursor and the
+   * open files returned in a batch will have their INode ids greater than
+   * the cursor INode id. Open files can only be requested by super user and
+   * the the list across batches are not atomic.
+   *
+   * @param prevId the cursor INode id.
+   * @param openFilesTypes types to filter the open files
+   * @throws IOException
+   */
+  @Idempotent
+  BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
index c24e585..d113d65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -35,20 +36,51 @@ import org.apache.htrace.core.Tracer;
 @InterfaceStability.Evolving
 public class OpenFilesIterator extends
     BatchedRemoteIterator<Long, OpenFileEntry> {
+
+  /**
+   * Open file types to filter the results.
+   */
+  public enum OpenFilesType {
+
+    ALL_OPEN_FILES((short) 0x01),
+    BLOCKING_DECOMMISSION((short) 0x02);
+
+    private final short mode;
+    OpenFilesType(short mode) {
+      this.mode = mode;
+    }
+
+    public short getMode() {
+      return mode;
+    }
+
+    public static OpenFilesType valueOf(short num) {
+      for (OpenFilesType type : OpenFilesType.values()) {
+        if (type.getMode() == num) {
+          return type;
+        }
+      }
+      return null;
+    }
+  }
+
   private final ClientProtocol namenode;
   private final Tracer tracer;
+  private final EnumSet<OpenFilesType> types;
 
-  public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) {
+  public OpenFilesIterator(ClientProtocol namenode, Tracer tracer,
+      EnumSet<OpenFilesType> types) {
     super(HdfsConstants.GRANDFATHER_INODE_ID);
     this.namenode = namenode;
     this.tracer = tracer;
+    this.types = types;
   }
 
   @Override
   public BatchedEntries<OpenFileEntry> makeRequest(Long prevId)
       throws IOException {
     try (TraceScope ignored = tracer.newScope("listOpenFiles")) {
-      return namenode.listOpenFiles(prevId);
+      return namenode.listOpenFiles(prevId, types);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 9ccc2fa..ea5c951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
@@ -1893,13 +1894,24 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
+  @Deprecated
   @Override
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
-    ListOpenFilesRequestProto req =
-        ListOpenFilesRequestProto.newBuilder().setId(prevId).build();
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  @Override
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    ListOpenFilesRequestProto.Builder req =
+        ListOpenFilesRequestProto.newBuilder().setId(prevId);
+    if (openFilesTypes != null) {
+      req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes));
+    }
     try {
-      ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req);
+      ListOpenFilesResponseProto response =
+          rpcProxy.listOpenFiles(null, req.build());
       List<OpenFileEntry> openFileEntries =
           Lists.newArrayListWithCapacity(response.getEntriesCount());
       for (OpenFilesBatchResponseProto p : response.getEntriesList()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 813083f..3180f70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
@@ -131,6 +132,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsE
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
@@ -3258,5 +3260,27 @@ public class PBHelperClient {
         .build();
   }
 
+  public static EnumSet<OpenFilesType> convertOpenFileTypes(
+      List<OpenFilesTypeProto> openFilesTypeProtos) {
+    EnumSet<OpenFilesType> types = EnumSet.noneOf(OpenFilesType.class);
+    for (OpenFilesTypeProto af : openFilesTypeProtos) {
+      OpenFilesType type = OpenFilesType.valueOf((short)af.getNumber());
+      if (type != null) {
+        types.add(type);
+      }
+    }
+    return types;
+  }
 
+  public static List<OpenFilesTypeProto> convertOpenFileTypes(
+      EnumSet<OpenFilesType> types) {
+    List<OpenFilesTypeProto> typeProtos = new ArrayList<>();
+    for (OpenFilesType type : types) {
+      OpenFilesTypeProto typeProto = OpenFilesTypeProto.valueOf(type.getMode());
+      if (typeProto != null) {
+        typeProtos.add(typeProto);
+      }
+    }
+    return typeProtos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index b33462b..f247da8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -796,8 +796,14 @@ message GetEditsFromTxidResponseProto {
   required EventsListProto eventsList = 1;
 }
 
+enum OpenFilesTypeProto {
+  ALL_OPEN_FILES = 1;
+  BLOCKING_DECOMMISSION = 2;
+}
+
 message ListOpenFilesRequestProto {
   required int64 id = 1;
+  repeated OpenFilesTypeProto types = 2;
 }
 
 message OpenFilesBatchResponseProto {
@@ -810,6 +816,7 @@ message OpenFilesBatchResponseProto {
 message ListOpenFilesResponseProto {
   repeated OpenFilesBatchResponseProto entries = 1;
   required bool hasMore = 2;
+  repeated OpenFilesTypeProto types = 3;
 }
 
 service ClientNamenodeProtocol {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index d63460b..a9d2d1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
@@ -1852,13 +1853,17 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public ListOpenFilesResponseProto listOpenFiles(RpcController controller,
       ListOpenFilesRequestProto req) throws ServiceException {
     try {
-      BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId());
+      EnumSet<OpenFilesType> openFilesTypes =
+          PBHelperClient.convertOpenFileTypes(req.getTypesList());
+      BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId(),
+          openFilesTypes);
       ListOpenFilesResponseProto.Builder builder =
           ListOpenFilesResponseProto.newBuilder();
       builder.setHasMore(entries.hasMore());
       for (int i = 0; i < entries.size(); i++) {
         builder.addEntries(PBHelperClient.convert(entries.get(i)));
       }
+      builder.addAllTypes(req.getTypesList());
       return builder.build();
     } catch (IOException e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 59e06c6..6b7175d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2294,7 +2294,7 @@ public class BlockManager implements BlockStatsMXBean {
    * If there were any reconstruction requests that timed out, reap them
    * and put them back into the neededReconstruction queue
    */
-  private void processPendingReconstructions() {
+  void processPendingReconstructions() {
     BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
     if (timedOutItems != null) {
       namesystem.writeLock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
index 928036a..e338591 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
@@ -36,10 +36,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -649,8 +653,10 @@ public class DatanodeAdminManager {
         boolean pruneReliableBlocks) {
       boolean firstReplicationLog = true;
       // Low redundancy in UC Blocks only
-      int lowRedundancyInOpenFiles = 0;
-      // All low redundancy blocks. Includes lowRedundancyInOpenFiles.
+      int lowRedundancyBlocksInOpenFiles = 0;
+      LightWeightHashSet<Long> lowRedundancyOpenFiles =
+          new LightWeightLinkedSet<>();
+      // All low redundancy blocks. Includes lowRedundancyOpenFiles.
       int lowRedundancyBlocks = 0;
       // All maintenance and decommission replicas.
       int outOfServiceOnlyReplicas = 0;
@@ -737,15 +743,24 @@ public class DatanodeAdminManager {
         // Update various counts
         lowRedundancyBlocks++;
         if (bc.isUnderConstruction()) {
-          lowRedundancyInOpenFiles++;
+          INode ucFile = namesystem.getFSDirectory().getInode(bc.getId());
+          if(!(ucFile instanceof  INodeFile) ||
+              !ucFile.asFile().isUnderConstruction()) {
+            LOG.warn("File " + ucFile.getLocalName() + " is not under " +
+                "construction. Skipping add to low redundancy open files!");
+          } else {
+            lowRedundancyBlocksInOpenFiles++;
+            lowRedundancyOpenFiles.add(ucFile.getId());
+          }
         }
         if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
           outOfServiceOnlyReplicas++;
         }
       }
 
-      datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles,
-          lowRedundancyBlocks, outOfServiceOnlyReplicas);
+      datanode.getLeavingServiceStatus().set(lowRedundancyBlocksInOpenFiles,
+          lowRedundancyOpenFiles, lowRedundancyBlocks,
+          outOfServiceOnlyReplicas);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 618bc13..16ffb43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -831,17 +832,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** Leaving service status. */
   public class LeavingServiceStatus {
     private int underReplicatedBlocks;
+    private int underReplicatedBlocksInOpenFiles;
     private int outOfServiceOnlyReplicas;
-    private int underReplicatedInOpenFiles;
+    private LightWeightHashSet<Long> underReplicatedOpenFiles =
+        new LightWeightLinkedSet<>();
     private long startTime;
     
-    synchronized void set(int underRepInOpenFiles, int underRepBlocks,
-        int outOfServiceOnlyRep) {
+    synchronized void set(int lowRedundancyBlocksInOpenFiles,
+        LightWeightHashSet<Long> underRepInOpenFiles,
+        int underRepBlocks, int outOfServiceOnlyRep) {
       if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return;
       }
-      underReplicatedInOpenFiles = underRepInOpenFiles;
+      underReplicatedOpenFiles = underRepInOpenFiles;
       underReplicatedBlocks = underRepBlocks;
+      underReplicatedBlocksInOpenFiles = lowRedundancyBlocksInOpenFiles;
       outOfServiceOnlyReplicas = outOfServiceOnlyRep;
     }
 
@@ -864,7 +869,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
       if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
-      return underReplicatedInOpenFiles;
+      return underReplicatedBlocksInOpenFiles;
+    }
+    /** @return the collection of under-replicated blocks in open files */
+    public synchronized LightWeightHashSet<Long> getOpenFiles() {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
+        return new LightWeightLinkedSet<>();
+      }
+      return underReplicatedOpenFiles;
     }
     /** Set start time */
     public synchronized void setStartTime(long time) {
@@ -880,7 +892,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       }
       return startTime;
     }
-  }  // End of class DecommissioningStatus
+  }  // End of class LeavingServiceStatus
 
   /**
    * Set the flag to indicate if this datanode is disallowed from communicating

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index c6cd595..537eaf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -1935,9 +1936,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     return null;
   }
 
+  @Deprecated
   @Override
-  public BatchedEntries<OpenFileEntry> listOpenFiles(long arg0)
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  @Override
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     checkOperation(OperationCategory.READ, false);
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 286c41c..54decc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
@@ -276,6 +277,7 @@ import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -1762,12 +1764,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * the open files returned in a batch will have their INode ids greater than
    * this cursor. Open files can only be requested by super user and the the
    * list across batches does not represent a consistent view of all open files.
+   * TODO: HDFS-12969 - to report open files by type.
    *
    * @param prevId the cursor INode id.
+   * @param openFilesTypes
    * @throws IOException
    */
-  BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId)
-      throws IOException {
+  BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     final String operationName = "listOpenFiles";
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
@@ -1775,7 +1779,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     BatchedListEntries<OpenFileEntry> batchedListEntries;
     try {
       checkOperation(OperationCategory.READ);
-      batchedListEntries = leaseManager.getUnderConstructionFiles(prevId);
+      if(openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) {
+        batchedListEntries = leaseManager.getUnderConstructionFiles(prevId);
+      } else {
+        if(openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) {
+          batchedListEntries = getFilesBlockingDecom(prevId);
+        } else {
+          throw new IllegalArgumentException("Unknown OpenFileType: "
+              + openFilesTypes);
+        }
+      }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, null);
       throw e;
@@ -1786,6 +1799,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return batchedListEntries;
   }
 
+  public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId) {
+    assert hasReadLock();
+    final List<OpenFileEntry> openFileEntries = Lists.newArrayList();
+    LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>();
+    for (DatanodeDescriptor dataNode :
+        blockManager.getDatanodeManager().getDatanodes()) {
+      for (long ucFileId : dataNode.getLeavingServiceStatus().getOpenFiles()) {
+        INode ucFile = getFSDirectory().getInode(ucFileId);
+        if (ucFile == null || ucFileId <= prevId ||
+            openFileIds.contains(ucFileId)) {
+          // probably got deleted or
+          // part of previous batch or
+          // already part of the current batch
+          continue;
+        }
+        Preconditions.checkState(ucFile instanceof INodeFile);
+        openFileIds.add(ucFileId);
+        INodeFile inodeFile = ucFile.asFile();
+        openFileEntries.add(new OpenFileEntry(
+            inodeFile.getId(), inodeFile.getFullPathName(),
+            inodeFile.getFileUnderConstructionFeature().getClientName(),
+            inodeFile.getFileUnderConstructionFeature().getClientMachine()));
+        if (openFileIds.size() >= this.maxListOpenFilesResponses) {
+          return new BatchedListEntries<>(openFileEntries, true);
+        }
+      }
+    }
+    return new BatchedListEntries<>(openFileEntries, false);
+  }
+
   private String metaSaveAsString() {
     StringWriter sw = new StringWriter();
     PrintWriter pw = new PrintWriter(sw);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 94bd15f..80f1ba3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -1334,11 +1335,18 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     namesystem.metaSave(filename);
   }
 
+  @Deprecated
   @Override // ClientProtocol
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  @Override // ClientProtocol
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     checkNNStartup();
-    return namesystem.listOpenFiles(prevId);
+    return namesystem.listOpenFiles(prevId, openFilesTypes);
   }
 
   @Override // ClientProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index f4985a6..7367309 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -462,7 +464,7 @@ public class DFSAdmin extends FsShell {
     "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
     "\t[-metasave filename]\n" +
     "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
-    "\t[-listOpenFiles]\n" +
+    "\t[-listOpenFiles [-blockingDecommission]]\n" +
     "\t[-help [cmd]]\n";
 
   /**
@@ -913,8 +915,21 @@ public class DFSAdmin extends FsShell {
    * Usage: hdfs dfsadmin -listOpenFiles
    *
    * @throws IOException
+   * @param argv
    */
-  public int listOpenFiles() throws IOException {
+  public int listOpenFiles(String[] argv) throws IOException {
+    List<OpenFilesType> types = new ArrayList<>();
+    if (argv != null) {
+      List<String> args = new ArrayList<>(Arrays.asList(argv));
+      if (StringUtils.popOption("-blockingDecommission", args)) {
+        types.add(OpenFilesType.BLOCKING_DECOMMISSION);
+      }
+    }
+    if (types.isEmpty()) {
+      types.add(OpenFilesType.ALL_OPEN_FILES);
+    }
+    EnumSet<OpenFilesType> openFilesTypes = EnumSet.copyOf(types);
+
     DistributedFileSystem dfs = getDFS();
     Configuration dfsConf = dfs.getConf();
     URI dfsUri = dfs.getUri();
@@ -926,9 +941,9 @@ public class DFSAdmin extends FsShell {
           dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class,
           UserGroupInformation.getCurrentUser(), false);
       openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(),
-          FsTracer.get(dfsConf));
+          FsTracer.get(dfsConf), openFilesTypes);
     } else {
-      openFilesRemoteIterator = dfs.listOpenFiles();
+      openFilesRemoteIterator = dfs.listOpenFiles(openFilesTypes);
     }
     printOpenFiles(openFilesRemoteIterator);
     return 0;
@@ -1214,9 +1229,11 @@ public class DFSAdmin extends FsShell {
         + "\tIf 'incremental' is specified, it will be an incremental\n"
         + "\tblock report; otherwise, it will be a full block report.\n";
 
-    String listOpenFiles = "-listOpenFiles\n"
+    String listOpenFiles = "-listOpenFiles [-blockingDecommission]\n"
         + "\tList all open files currently managed by the NameNode along\n"
-        + "\twith client name and client machine accessing them.\n";
+        + "\twith client name and client machine accessing them.\n"
+        + "\tIf 'blockingDecommission' option is specified, it will list the\n"
+        + "\topen files only that are blocking the ongoing Decommission.";
 
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
@@ -1964,7 +1981,8 @@ public class DFSAdmin extends FsShell {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
     } else if ("-listOpenFiles".equals(cmd)) {
-      System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]");
+      System.err.println("Usage: hdfs dfsadmin"
+          + " [-listOpenFiles [-blockingDecommission]]");
     } else {
       System.err.println("Usage: hdfs dfsadmin");
       System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
@@ -2119,7 +2137,7 @@ public class DFSAdmin extends FsShell {
         return exitCode;
       }
     } else if ("-listOpenFiles".equals(cmd)) {
-      if (argv.length != 1) {
+      if ((argv.length != 1) && (argv.length != 2)) {
         printUsage(cmd);
         return exitCode;
       }
@@ -2205,7 +2223,7 @@ public class DFSAdmin extends FsShell {
       } else if ("-triggerBlockReport".equals(cmd)) {
         exitCode = triggerBlockReport(argv);
       } else if ("-listOpenFiles".equals(cmd)) {
-        exitCode = listOpenFiles();
+        exitCode = listOpenFiles(argv);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 316b955..a13116f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -409,7 +409,7 @@ Usage:
 | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
 | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted |
 | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
-| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. |
+| `-listOpenFiles` `[-blockingDecommission]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. |
 | `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
 
 Runs a HDFS dfsadmin client.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
index c0cef19..5d96b7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
@@ -388,9 +388,19 @@ public class AdminStatesBaseTest {
   protected void startCluster(int numNameNodes, int numDatanodes,
       boolean setupHostsFile, long[] nodesCapacity,
       boolean checkDataNodeHostConfig) throws IOException {
+    startCluster(numNameNodes, numDatanodes, setupHostsFile, nodesCapacity,
+        checkDataNodeHostConfig, true);
+  }
+
+  protected void startCluster(int numNameNodes, int numDatanodes,
+      boolean setupHostsFile, long[] nodesCapacity,
+      boolean checkDataNodeHostConfig, boolean federation) throws IOException {
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
         .numDataNodes(numDatanodes);
+    if (federation) {
+      builder.nnTopology(
+          MiniDFSNNTopology.simpleFederatedTopology(numNameNodes));
+    }
     if (setupHostsFile) {
       builder.setupHostsFile(setupHostsFile);
     }
@@ -413,6 +423,12 @@ public class AdminStatesBaseTest {
     startCluster(numNameNodes, numDatanodes, false, null, false);
   }
 
+  protected void startSimpleCluster(int numNameNodes, int numDatanodes)
+      throws IOException {
+    startCluster(numNameNodes, numDatanodes, false, null, false, false);
+  }
+
+
   protected void startSimpleHACluster(int numDatanodes) throws IOException {
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index ac14a2a..d82025c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -22,16 +22,23 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.text.StrBuilder;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -60,7 +67,9 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -651,6 +660,174 @@ public class TestDecommission extends AdminStatesBaseTest {
     fdos.close();
   }
 
+  private static String scanIntoString(final ByteArrayOutputStream baos) {
+    final StrBuilder sb = new StrBuilder();
+    final Scanner scanner = new Scanner(baos.toString());
+    while (scanner.hasNextLine()) {
+      sb.appendln(scanner.nextLine());
+    }
+    scanner.close();
+    return sb.toString();
+  }
+
+  private boolean verifyOpenFilesListing(String message,
+      HashSet<Path> closedFileSet,
+      HashMap<Path, FSDataOutputStream> openFilesMap,
+      ByteArrayOutputStream out, int expOpenFilesListSize) {
+    final String outStr = scanIntoString(out);
+    LOG.info(message + " - stdout: \n" + outStr);
+    for (Path closedFilePath : closedFileSet) {
+      if(outStr.contains(closedFilePath.toString())) {
+        return false;
+      }
+    }
+    HashSet<Path> openFilesNotListed = new HashSet<>();
+    for (Path openFilePath : openFilesMap.keySet()) {
+      if(!outStr.contains(openFilePath.toString())) {
+        openFilesNotListed.add(openFilePath);
+      }
+    }
+    int actualOpenFilesListedSize =
+        openFilesMap.size() - openFilesNotListed.size();
+    if (actualOpenFilesListedSize >= expOpenFilesListSize) {
+      return true;
+    } else {
+      LOG.info("Open files that are not listed yet: " + openFilesNotListed);
+      return false;
+    }
+  }
+
+  private void verifyOpenFilesBlockingDecommission(HashSet<Path> closedFileSet,
+      HashMap<Path, FSDataOutputStream> openFilesMap, final int maxOpenFiles)
+      throws Exception {
+    final PrintStream oldStreamOut = System.out;
+    try {
+      final ByteArrayOutputStream toolOut = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(toolOut));
+      final DFSAdmin dfsAdmin = new DFSAdmin(getConf());
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            toolOut.reset();
+            assertEquals(0, ToolRunner.run(dfsAdmin,
+                new String[]{"-listOpenFiles", "-blockingDecommission"}));
+            toolOut.flush();
+            return verifyOpenFilesListing(
+                "dfsadmin -listOpenFiles -blockingDecommission",
+                closedFileSet, openFilesMap, toolOut, maxOpenFiles);
+          } catch (Exception e) {
+            LOG.warn("Unexpected exception: " + e);
+          }
+          return false;
+        }
+      }, 1000, 60000);
+    } finally {
+      System.setOut(oldStreamOut);
+    }
+  }
+
+  @Test(timeout=180000)
+  public void testDecommissionWithOpenfileReporting()
+      throws Exception {
+    LOG.info("Starting test testDecommissionWithOpenfileReporting");
+
+    // Disable redundancy monitor check so that open files blocking
+    // decommission can be listed and verified.
+    getConf().setInt(
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1000);
+    getConf().setLong(
+        DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1);
+
+    //At most 1 node can be decommissioned
+    startSimpleCluster(1, 4);
+
+    FileSystem fileSys = getCluster().getFileSystem(0);
+    FSNamesystem ns = getCluster().getNamesystem(0);
+
+    final String[] closedFiles = new String[3];
+    final String[] openFiles = new String[3];
+    HashSet<Path> closedFileSet = new HashSet<>();
+    HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      closedFiles[i] = "/testDecommissionWithOpenfileReporting.closed." + i;
+      openFiles[i] = "/testDecommissionWithOpenfileReporting.open." + i;
+      writeFile(fileSys, new Path(closedFiles[i]), (short)3, 10);
+      closedFileSet.add(new Path(closedFiles[i]));
+      writeFile(fileSys, new Path(openFiles[i]), (short)3, 10);
+      FSDataOutputStream fdos =  fileSys.append(new Path(openFiles[i]));
+      openFilesMap.put(new Path(openFiles[i]), fdos);
+    }
+
+    HashMap<DatanodeInfo, Integer> dnInfoMap = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+          getCluster().getNameNode(0), openFiles[i], 0, blockSize * 10);
+      for (DatanodeInfo dn : lbs.getLastLocatedBlock().getLocations()) {
+        if (dnInfoMap.containsKey(dn)) {
+          dnInfoMap.put(dn, dnInfoMap.get(dn) + 1);
+        } else {
+          dnInfoMap.put(dn, 1);
+        }
+      }
+    }
+
+    DatanodeInfo dnToDecommission = null;
+    int maxDnOccurance = 0;
+    for (Map.Entry<DatanodeInfo, Integer> entry : dnInfoMap.entrySet()) {
+      if (entry.getValue() > maxDnOccurance) {
+        maxDnOccurance = entry.getValue();
+        dnToDecommission = entry.getKey();
+      }
+    }
+    LOG.info("XXX Dn to decommission: " + dnToDecommission + ", max: "
+        + maxDnOccurance);
+
+    //decommission one of the 3 nodes which have last block
+    DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
+    ArrayList<String> nodes = new ArrayList<>();
+    dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid());
+    nodes.add(dnToDecommission.getXferAddr());
+    initExcludeHosts(nodes);
+    refreshNodes(0);
+    waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS);
+
+    // list and verify all the open files that are blocking decommission
+    verifyOpenFilesBlockingDecommission(
+        closedFileSet, openFilesMap, maxDnOccurance);
+
+    final AtomicBoolean stopRedundancyMonitor = new AtomicBoolean(false);
+    Thread monitorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopRedundancyMonitor.get()) {
+          try {
+            BlockManagerTestUtil.checkRedundancy(
+                getCluster().getNamesystem().getBlockManager());
+            BlockManagerTestUtil.updateState(
+                getCluster().getNamesystem().getBlockManager());
+            Thread.sleep(1000);
+          } catch (Exception e) {
+            LOG.warn("Encountered exception during redundancy monitor: " + e);
+          }
+        }
+      }
+    });
+    monitorThread.start();
+
+    waitNodeState(dnToDecommission, AdminStates.DECOMMISSIONED);
+    stopRedundancyMonitor.set(true);
+    monitorThread.join();
+
+    // Open file is no more blocking decommission as all its blocks
+    // are re-replicated.
+    openFilesMap.clear();
+    verifyOpenFilesBlockingDecommission(
+        closedFileSet, openFilesMap, 0);
+  }
+
   @Test(timeout = 360000)
   public void testDecommissionWithOpenFileAndBlockRecovery()
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
index 685ea8b..3cb10bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.junit.After;
 import org.junit.Assert;
@@ -254,7 +256,7 @@ public class TestHdfsAdmin {
     HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet());
     RemoteIterator<OpenFileEntry> openFilesRemoteItr =
-        hdfsAdmin.listOpenFiles();
+        hdfsAdmin.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
     while (openFilesRemoteItr.hasNext()) {
       String filePath = openFilesRemoteItr.next().getFilePath();
       assertFalse(filePath + " should not be listed under open files!",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 7ee766f..dfb40a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -168,7 +168,17 @@ public class BlockManagerTestUtil {
   public static int computeInvalidationWork(BlockManager bm) {
     return bm.computeInvalidateWork(Integer.MAX_VALUE);
   }
-  
+
+  /**
+   * Check the redundancy of blocks and trigger replication if needed.
+   * @param blockManager
+   */
+  public static void checkRedundancy(final BlockManager blockManager) {
+    blockManager.computeDatanodeWork();
+    blockManager.processPendingReconstructions();
+    blockManager.rescanPostponedMisreplicatedBlocks();
+  }
+
   /**
    * Compute all the replication and invalidation work for the
    * given BlockManager.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
index 55bc7c3..0a8da4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
@@ -206,7 +206,7 @@ public class TestLeaseManager {
         HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
         perm, 0L);
     when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
-    verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
+    verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0);
 
     for (Long iNodeId : iNodeIds) {
       INodeFile iNodeFile = stubInodeFile(iNodeId);
@@ -215,13 +215,13 @@ public class TestLeaseManager {
       when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
       lm.addLease("holder_" + iNodeId, iNodeId);
     }
-    verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(),
-        iNodeIds.size(), iNodeIds.size());
+    verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory,
+        iNodeIds.size(), iNodeIds.size(), iNodeIds.size());
 
     for (Long iNodeId : iNodeIds) {
       lm.removeLease(iNodeId);
     }
-    verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
+    verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0);
   }
 
   /**
@@ -246,41 +246,44 @@ public class TestLeaseManager {
 
     // Case 1: No open files
     int scale = 0;
-    testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
+    testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
+        rootInodeDirectory, scale);
 
     for (int workerCount = 1;
          workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2;
          workerCount++) {
       // Case 2: Open files count is half of worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
 
       // Case 3: Open files count is 1 less of worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
 
       // Case 4: Open files count is equal to worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
 
       // Case 5: Open files count is 1 more than worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
     }
 
     // Case 6: Open files count is way more than worker count
     scale = 1279;
-    testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
+    testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
+        rootInodeDirectory, scale);
   }
 
-  private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
-      final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
-      int scale) throws IOException {
-    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
+  private void testInodeWithLeasesAtScaleImpl(FSNamesystem fsNamesystem,
+      final LeaseManager leaseManager, final FSDirectory fsDirectory,
+      INodeDirectory ancestorDirectory, int scale) throws IOException {
+    verifyINodeLeaseCounts(
+        fsNamesystem, leaseManager, ancestorDirectory, 0, 0, 0);
 
     Set<Long> iNodeIds = new HashSet<>();
     for (int i = 0; i < scale; i++) {
@@ -293,11 +296,12 @@ public class TestLeaseManager {
       when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
       leaseManager.addLease("holder_" + iNodeId, iNodeId);
     }
-    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(),
-        iNodeIds.size(), iNodeIds.size());
+    verifyINodeLeaseCounts(fsNamesystem, leaseManager,
+        ancestorDirectory, iNodeIds.size(), iNodeIds.size(), iNodeIds.size());
 
     leaseManager.removeAllLeases();
-    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
+    verifyINodeLeaseCounts(fsNamesystem, leaseManager,
+        ancestorDirectory, 0, 0, 0);
   }
 
   /**
@@ -389,10 +393,10 @@ public class TestLeaseManager {
 
   }
 
-  private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
-      INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
-      int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount)
-      throws IOException {
+  private void verifyINodeLeaseCounts(FSNamesystem fsNamesystem,
+      LeaseManager leaseManager, INodeDirectory ancestorDirectory,
+      int iNodeIdWithLeaseCount, int iNodeWithLeaseCount,
+      int iNodeUnderAncestorLeaseCount) throws IOException {
     assertEquals(iNodeIdWithLeaseCount,
         leaseManager.getINodeIdWithLeases().size());
     assertEquals(iNodeWithLeaseCount,
@@ -401,6 +405,8 @@ public class TestLeaseManager {
         leaseManager.getINodeWithLeases(ancestorDirectory).size());
     assertEquals(iNodeIdWithLeaseCount,
         leaseManager.getUnderConstructionFiles(0).size());
+    assertEquals(0, (fsNamesystem.getFilesBlockingDecom(0) == null ?
+        0 : fsNamesystem.getFilesBlockingDecom(0).size()));
   }
 
   private Map<String, INode> createINodeTree(INodeDirectory parentDir,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
index b290194..cfee7ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -95,9 +97,13 @@ public class TestListOpenFiles {
     verifyOpenFiles(openFiles);
 
     BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries =
-        nnRpc.listOpenFiles(0);
+        nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
     assertTrue("Open files list should be empty!",
         openFileEntryBatchedEntries.size() == 0);
+    BatchedEntries<OpenFileEntry> openFilesBlockingDecomEntries =
+        nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION));
+    assertTrue("Open files list blocking decommission should be empty!",
+        openFilesBlockingDecomEntries.size() == 0);
 
     openFiles.putAll(
         DFSTestUtil.createOpenFiles(fs, "open-1", 1));
@@ -121,16 +127,16 @@ public class TestListOpenFiles {
     }
   }
 
-  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
-      throws IOException {
+  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet());
     OpenFileEntry lastEntry = null;
     BatchedEntries<OpenFileEntry> batchedEntries;
     do {
       if (lastEntry == null) {
-        batchedEntries = nnRpc.listOpenFiles(0);
+        batchedEntries = nnRpc.listOpenFiles(0, openFilesTypes);
       } else {
-        batchedEntries = nnRpc.listOpenFiles(lastEntry.getId());
+        batchedEntries = nnRpc.listOpenFiles(lastEntry.getId(), openFilesTypes);
       }
       assertTrue("Incorrect open files list size!",
           batchedEntries.size() <= BATCH_SIZE);
@@ -146,6 +152,13 @@ public class TestListOpenFiles {
         remainingFiles.size() == 0);
   }
 
+  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
+      throws IOException {
+    verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+    verifyOpenFiles(new HashMap<>(),
+        EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION));
+  }
+
   private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,
       int numFilesToCreate) throws IOException {
     HashSet<Path> files = new HashSet<>();
@@ -197,6 +210,8 @@ public class TestListOpenFiles {
             try {
               assertEquals(0, ToolRunner.run(dfsAdmin,
                   new String[] {"-listOpenFiles"}));
+              assertEquals(0, ToolRunner.run(dfsAdmin,
+                  new String[] {"-listOpenFiles", "-blockingDecommission"}));
               // Sleep for some time to avoid
               // flooding logs with listing.
               Thread.sleep(listingIntervalMsec);
@@ -222,6 +237,8 @@ public class TestListOpenFiles {
 
       assertEquals(0, ToolRunner.run(dfsAdmin,
           new String[] {"-listOpenFiles"}));
+      assertEquals(0, ToolRunner.run(dfsAdmin,
+          new String[] {"-listOpenFiles", "-blockingDecommission"}));
       assertFalse("Client Error!", listOpenFilesError.get());
 
       clientThread.join();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org