You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/27 19:52:09 UTC

[01/14] hadoop git commit: YARN-4994. Use MiniYARNCluster with try-with-resources in tests. Contributed by Andras Bokor.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-5734 736f54b72 -> c0e0ef296


YARN-4994. Use MiniYARNCluster with try-with-resources in tests. Contributed by Andras Bokor.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae401539
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae401539
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae401539

Branch: refs/heads/YARN-5734
Commit: ae401539eaf7745ec8690f9281726fb4cdcdbe94
Parents: 736f54b
Author: Akira Ajisaka <aa...@apache.org>
Authored: Thu Dec 22 14:32:24 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Thu Dec 22 14:32:24 2016 +0900

----------------------------------------------------------------------
 .../jobhistory/TestJobHistoryEventHandler.java  | 10 +--
 .../hadoop/tools/TestHadoopArchiveLogs.java     | 12 +--
 .../tools/TestHadoopArchiveLogsRunner.java      | 11 +--
 ...stHedgingRequestRMFailoverProxyProvider.java | 57 +++++++-------
 .../yarn/client/api/impl/TestAMRMProxy.java     | 37 +++------
 .../hadoop/yarn/client/cli/TestYarnCLI.java     | 15 +---
 .../hadoop/yarn/server/TestMiniYarnCluster.java | 80 +++++++++-----------
 7 files changed, 85 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 54a2fad..0b33d6b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -563,11 +563,9 @@ public class TestJobHistoryEventHandler {
     TestParams t = new TestParams(RunningAppContext.class, false);
     Configuration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    MiniYARNCluster yarnCluster = null;
     long currentTime = System.currentTimeMillis();
-    try {
-      yarnCluster = new MiniYARNCluster(
-            TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1);
+    try (MiniYARNCluster yarnCluster = new MiniYARNCluster(
+        TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1)) {
       yarnCluster.init(conf);
       yarnCluster.start();
       Configuration confJHEH = new YarnConfiguration(conf);
@@ -720,10 +718,6 @@ public class TestJobHistoryEventHandler {
               tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
       Assert.assertEquals(TaskType.MAP.toString(),
               tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
-    } finally {
-      if (yarnCluster != null) {
-        yarnCluster.stop();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
index d2d7801..5b6062b 100644
--- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
@@ -165,13 +165,11 @@ public class TestHadoopArchiveLogs {
 
   @Test(timeout = 30000)
   public void testFilterAppsByAggregatedStatus() throws Exception {
-    MiniYARNCluster yarnCluster = null;
-    try {
+    try (MiniYARNCluster yarnCluster =
+        new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(),
+            1, 1, 1, 1)) {
       Configuration conf = new Configuration();
       conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
-      yarnCluster =
-          new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
-              1, 1, 1);
       yarnCluster.init(conf);
       yarnCluster.start();
       conf = yarnCluster.getConfig();
@@ -237,10 +235,6 @@ public class TestHadoopArchiveLogs {
       Assert.assertTrue(hal.eligibleApplications.contains(app4));
       Assert.assertTrue(hal.eligibleApplications.contains(app7));
       Assert.assertTrue(hal.eligibleApplications.contains(app8));
-    } finally {
-      if (yarnCluster != null) {
-        yarnCluster.stop();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
index 098e2fd..fad9b97 100644
--- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
@@ -52,16 +52,14 @@ public class TestHadoopArchiveLogsRunner {
 
   @Test(timeout = 50000)
   public void testHadoopArchiveLogs() throws Exception {
-    MiniYARNCluster yarnCluster = null;
     MiniDFSCluster dfsCluster = null;
     FileSystem fs = null;
-    try {
+    try (MiniYARNCluster yarnCluster =
+        new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
+            1, 2, 1, 1)) {
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
       conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
-      yarnCluster =
-          new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
-              1, 2, 1, 1);
       yarnCluster.init(conf);
       yarnCluster.start();
       conf = yarnCluster.getConfig();
@@ -133,9 +131,6 @@ public class TestHadoopArchiveLogsRunner {
           harLogs[2].getOwner());
       Assert.assertEquals(0, fs.listStatus(workingDir).length);
     } finally {
-      if (yarnCluster != null) {
-        yarnCluster.stop();
-      }
       if (fs != null) {
         fs.close();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
index 30b409e..b55cad8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
@@ -35,8 +35,6 @@ public class TestHedgingRequestRMFailoverProxyProvider {
 
   @Test
   public void testHedgingRequestProxyProvider() throws Exception {
-    final MiniYARNCluster cluster =
-        new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
     Configuration conf = new YarnConfiguration();
 
     conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
@@ -49,41 +47,44 @@ public class TestHedgingRequestRMFailoverProxyProvider {
     conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
         2000);
 
-    HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
-    HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
-    HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
-    HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
-    HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
-    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+    try (MiniYARNCluster cluster =
+        new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1)) {
 
-    cluster.init(conf);
-    cluster.start();
+      HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
+      HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
+      HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
+      HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
+      HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
+      conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
 
-    final YarnClient client = YarnClient.createYarnClient();
-    client.init(conf);
-    client.start();
+      cluster.init(conf);
+      cluster.start();
 
-    // Transition rm5 to active;
-    long start = System.currentTimeMillis();
-    makeRMActive(cluster, 4);
+      final YarnClient client = YarnClient.createYarnClient();
+      client.init(conf);
+      client.start();
 
-    validateActiveRM(client);
+      // Transition rm5 to active;
+      long start = System.currentTimeMillis();
+      makeRMActive(cluster, 4);
 
-    long end = System.currentTimeMillis();
-    System.out.println("Client call succeeded at " + end);
-    // should return the response fast
-    Assert.assertTrue(end - start <= 10000);
+      validateActiveRM(client);
 
-    // transition rm5 to standby
-    cluster.getResourceManager(4).getRMContext().getRMAdminService()
-        .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
-            HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+      long end = System.currentTimeMillis();
+      System.out.println("Client call succeeded at " + end);
+      // should return the response fast
+      Assert.assertTrue(end - start <= 10000);
 
-    makeRMActive(cluster, 2);
+      // transition rm5 to standby
+      cluster.getResourceManager(4).getRMContext().getRMAdminService()
+          .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_USER));
 
-    validateActiveRM(client);
+      makeRMActive(cluster, 2);
 
-    cluster.stop();
+      validateActiveRM(client);
+
+    }
   }
 
   private void validateActiveRM(YarnClient client) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
index 33f7527..9eef9a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
@@ -60,11 +60,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
    */
   @Test(timeout = 120000)
   public void testAMRMProxyE2E() throws Exception {
-    MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
-    YarnClient rmClient = null;
     ApplicationMasterProtocol client;
 
-    try {
+    try (MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E",
+        1, 1, 1);
+            YarnClient rmClient = YarnClient.createYarnClient()) {
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
       cluster.init(conf);
@@ -75,7 +75,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
 
       yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-      rmClient = YarnClient.createYarnClient();
       rmClient.init(yarnConf);
       rmClient.start();
 
@@ -135,11 +134,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
       Thread.sleep(500);
       Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState());
 
-    } finally {
-      if (rmClient != null) {
-        rmClient.stop();
-      }
-      cluster.stop();
     }
   }
 
@@ -150,12 +144,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
    */
   @Test(timeout = 120000)
   public void testE2ETokenRenewal() throws Exception {
-    MiniYARNCluster cluster =
-        new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
-    YarnClient rmClient = null;
     ApplicationMasterProtocol client;
 
-    try {
+    try (MiniYARNCluster cluster =
+        new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
+           YarnClient rmClient = YarnClient.createYarnClient()) {
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
       conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500);
@@ -170,7 +163,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
       final Configuration yarnConf = cluster.getConfig();
       yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-      rmClient = YarnClient.createYarnClient();
       rmClient.init(yarnConf);
       rmClient.start();
 
@@ -216,11 +208,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
       client.finishApplicationMaster(FinishApplicationMasterRequest
           .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
 
-    } finally {
-      if (rmClient != null) {
-        rmClient.stop();
-      }
-      cluster.stop();
     }
   }
 
@@ -230,11 +217,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
    */
   @Test(timeout = 120000)
   public void testE2ETokenSwap() throws Exception {
-    MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
-    YarnClient rmClient = null;
     ApplicationMasterProtocol client;
 
-    try {
+    try (MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap",
+        1, 1, 1);
+            YarnClient rmClient = YarnClient.createYarnClient()) {
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
       cluster.init(conf);
@@ -242,7 +229,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
 
       // the client will connect to the RM with the token provided by AMRMProxy
       final Configuration yarnConf = cluster.getConfig();
-      rmClient = YarnClient.createYarnClient();
       rmClient.init(yarnConf);
       rmClient.start();
 
@@ -260,11 +246,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
             e.getMessage().startsWith("Invalid AMRMToken from appattempt_"));
       }
 
-    } finally {
-      if (rmClient != null) {
-        rmClient.stop();
-      }
-      cluster.stop();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index a677606..2e90581 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -1721,15 +1721,13 @@ public class TestYarnCLI {
         + "ProportionalCapacityPreemptionPolicy");
     conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
     conf.setBoolean(PREFIX + "root.a.a1.disable_preemption", true);
-    MiniYARNCluster cluster =
-        new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
 
-    YarnClient yarnClient = null;
-    try {
+    try (MiniYARNCluster cluster =
+        new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
+         YarnClient yarnClient = YarnClient.createYarnClient()) {
       cluster.init(conf);
       cluster.start();
       final Configuration yarnConf = cluster.getConfig();
-      yarnClient = YarnClient.createYarnClient();
       yarnClient.init(yarnConf);
       yarnClient.start();
 
@@ -1742,13 +1740,6 @@ public class TestYarnCLI {
       assertEquals(0, result);
       Assert.assertTrue(sysOutStream.toString()
           .contains("Preemption : disabled"));
-    } finally {
-      // clean-up
-      if (yarnClient != null) {
-        yarnClient.stop();
-      }
-      cluster.stop();
-      cluster.close();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
index 9226ead..ff7fafc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import java.io.IOException;
 
 public class TestMiniYarnCluster {
 
@@ -41,10 +42,11 @@ public class TestMiniYarnCluster {
      */
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
     enableAHS = false;
-    MiniYARNCluster cluster = null;
-    try {
-      cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
-          numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
+    try (MiniYARNCluster cluster =
+        new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
+            numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
+                enableAHS)) {
+
       cluster.init(conf);
       cluster.start();
 
@@ -52,11 +54,6 @@ public class TestMiniYarnCluster {
       Assert.assertNull("Timeline Service should not have been started",
           cluster.getApplicationHistoryServer());
     }
-    finally {
-      if(cluster != null) {
-        cluster.stop();
-      }
-    }
 
     /*
      * Timeline service should start if TIMELINE_SERVICE_ENABLED == true
@@ -64,10 +61,10 @@ public class TestMiniYarnCluster {
      */
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     enableAHS = false;
-    cluster = null;
-    try {
-      cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
-          numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
+    try (MiniYARNCluster cluster =
+        new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
+            numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
+                enableAHS)) {
       cluster.init(conf);
 
       // Verify that the timeline-service starts on ephemeral ports by default
@@ -87,21 +84,16 @@ public class TestMiniYarnCluster {
       Assert.assertNotNull("Timeline Service should have been started",
           cluster.getApplicationHistoryServer());
     }
-    finally {
-      if(cluster != null) {
-        cluster.stop();
-      }
-    }
     /*
      * Timeline service should start if TIMELINE_SERVICE_ENABLED == false
      * and enableAHS == true
      */
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
     enableAHS = true;
-    cluster = null;
-    try {
-      cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
-          numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
+    try (MiniYARNCluster cluster =
+        new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
+            numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
+                enableAHS)) {
       cluster.init(conf);
       cluster.start();
 
@@ -115,15 +107,10 @@ public class TestMiniYarnCluster {
       Assert.assertNotNull("Timeline Service should have been started",
           cluster.getApplicationHistoryServer());
     }
-    finally {
-      if(cluster != null) {
-        cluster.stop();
-      }
-    }
   }
 
   @Test
-  public void testMultiRMConf() {
+  public void testMultiRMConf() throws IOException {
     String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
     int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
     Configuration conf = new YarnConfiguration();
@@ -137,23 +124,28 @@ public class TestMiniYarnCluster {
     conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
     conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
 
-    MiniYARNCluster cluster =
+    try (MiniYARNCluster cluster =
         new MiniYARNCluster(TestMiniYarnCluster.class.getName(),
-            2, 0, 1, 1);
-    cluster.init(conf);
-    Configuration conf1 = cluster.getResourceManager(0).getConfig(),
-        conf2 = cluster.getResourceManager(1).getConfig();
-    Assert.assertFalse(conf1 == conf2);
-    Assert.assertEquals("0.0.0.0:18032",
-        conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
-    Assert.assertEquals("0.0.0.0:28032",
-        conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
-    Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
+            2, 0, 1, 1)) {
+      cluster.init(conf);
+      Configuration conf1 = cluster.getResourceManager(0).getConfig(),
+          conf2 = cluster.getResourceManager(1).getConfig();
+      Assert.assertFalse(conf1 == conf2);
+      Assert.assertEquals("0.0.0.0:18032",
+          conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+              RM1_NODE_ID)));
+      Assert.assertEquals("0.0.0.0:28032",
+          conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+              RM2_NODE_ID)));
+      Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
 
-    Assert.assertEquals("0.0.0.0:18032",
-        conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
-    Assert.assertEquals("0.0.0.0:28032",
-        conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
-    Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
+      Assert.assertEquals("0.0.0.0:18032",
+          conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+              RM1_NODE_ID)));
+      Assert.assertEquals("0.0.0.0:28032",
+          conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+              RM2_NODE_ID)));
+      Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[05/14] hadoop git commit: HADOOP-13863. Azure: Add a new SAS key mode for WASB. Contributed by Dushyanth

Posted by xg...@apache.org.
HADOOP-13863. Azure: Add a new SAS key mode for WASB. Contributed by Dushyanth


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e92a7709
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e92a7709
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e92a7709

Branch: refs/heads/YARN-5734
Commit: e92a77099b91620cee84513cc879089907468075
Parents: 22befbd
Author: Mingliang Liu <li...@apache.org>
Authored: Thu Dec 22 17:33:25 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Thu Dec 22 20:15:18 2016 -0800

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |  33 +-
 .../fs/azure/AzureNativeFileSystemStore.java    |  84 ++-
 .../fs/azure/LocalSASKeyGeneratorImpl.java      | 263 +++++++++
 .../fs/azure/RemoteSASKeyGeneratorImpl.java     | 296 ++++++++++
 .../fs/azure/SASKeyGenerationException.java     |  40 ++
 .../hadoop/fs/azure/SASKeyGeneratorImpl.java    |  61 ++
 .../fs/azure/SASKeyGeneratorInterface.java      |  64 +++
 .../hadoop/fs/azure/SecureModeException.java    |  40 ++
 .../fs/azure/SecureStorageInterfaceImpl.java    | 565 +++++++++++++++++++
 .../fs/azure/WasbRemoteCallException.java       |  41 ++
 .../hadoop/fs/azure/WasbRemoteCallHelper.java   |  93 +++
 .../hadoop-azure/src/site/markdown/index.md     |  48 ++
 .../hadoop/fs/azure/TestContainerChecks.java    |  12 +-
 .../fs/azure/TestWasbUriAndConfiguration.java   |  17 +-
 .../src/test/resources/azure-test.xml           |  15 +-
 15 files changed, 1660 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 05004c1..b4a34db 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1222,6 +1222,38 @@
   <description>The implementation class of the S3A AbstractFileSystem.</description>
 </property>
 
+<!-- Azure file system properties -->
+<property>
+  <name>fs.azure.secure.mode</name>
+  <value>false</value>
+  <description>
+    Config flag to identify the mode in which fs.azure.NativeAzureFileSystem needs
+    to run under. Setting it "true" would make fs.azure.NativeAzureFileSystem use
+    SAS keys to communicate with Azure storage.
+  </description>
+</property>
+<property>
+  <name>fs.azure.local.sas.key.mode</name>
+  <value>false</value>
+  <description>
+    Works in conjuction with fs.azure.secure.mode. Setting this config to true
+    results in fs.azure.NativeAzureFileSystem using the local SAS key generation
+    where the SAS keys are generating in the same process as fs.azure.NativeAzureFileSystem.
+    If fs.azure.secure.mode flag is set to false, this flag has no effect.
+  </description>
+</property>
+<property>
+  <name>fs.azure.sas.expiry.period</name>
+  <value>90d</value>
+  <description>
+    The default value to be used for expiration period for SAS keys generated.
+    Can use the following suffix (case insensitive):
+    ms(millis), s(sec), m(min), h(hour), d(day)
+    to specify the time (such as 2s, 2m, 1h, etc.).
+  </description>
+</property>
+
+
 <property>
   <name>io.seqfile.compress.blocksize</name>
   <value>1000000</value>
@@ -2420,5 +2452,4 @@
       in audit logs.
     </description>
   </property>
-
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index d232a2d..07c389c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
-import org.apache.hadoop.fs.azure.StorageInterfaceImpl.CloudPageBlobWrapperImpl;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
 import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
@@ -150,6 +149,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging";
 
+  /**
+   * Configuration keys to identify if WASB needs to run in Secure mode. In Secure mode
+   * all interactions with Azure storage is performed using SAS uris. There are two sub modes
+   * within the Secure mode , one is remote SAS key mode where the SAS keys are generated
+   * from a remote process and local mode where SAS keys are generated within WASB.
+   */
+  @VisibleForTesting
+  public static final String KEY_USE_SECURE_MODE = "fs.azure.secure.mode";
+
+  /**
+   * By default the SAS Key mode is expected to run in Romote key mode. This flags sets it
+   * to run on the local mode.
+   */
+  public static final String KEY_USE_LOCAL_SAS_KEY_MODE = "fs.azure.local.sas.key.mode";
+
   private static final String PERMISSION_METADATA_KEY = "hdi_permission";
   private static final String OLD_PERMISSION_METADATA_KEY = "asv_permission";
   private static final String IS_FOLDER_METADATA_KEY = "hdi_isfolder";
@@ -232,6 +246,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
 
   /**
+   * Default values to control SAS Key mode.
+   * By default we set the values to false.
+   */
+  private static final boolean DEFAULT_USE_SECURE_MODE = false;
+  private static final boolean DEFAULT_USE_LOCAL_SAS_KEY_MODE = false;
+
+  /**
    * Enable flat listing of blobs as default option. This is useful only if
    * listing depth is AZURE_UNBOUNDED_DEPTH.
    */
@@ -278,6 +299,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   // Set if we're running against a storage emulator..
   private boolean isStorageEmulator = false;
 
+  // Configs controlling WASB SAS key mode.
+  private boolean useSecureMode = false;
+  private boolean useLocalSasKeyMode = false;
+
+  private String delegationToken;
   /**
    * A test hook interface that can modify the operation context we use for
    * Azure Storage operations, e.g. to inject errors.
@@ -410,16 +436,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   @Override
   public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation)
       throws IllegalArgumentException, AzureException, IOException  {
-    
+
     if (null == instrumentation) {
       throw new IllegalArgumentException("Null instrumentation");
     }
     this.instrumentation = instrumentation;
 
-    if (null == this.storageInteractionLayer) {
-      this.storageInteractionLayer = new StorageInterfaceImpl();
-    }
-    
     // Check that URI exists.
     //
     if (null == uri) {
@@ -446,6 +468,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     sessionUri = uri;
     sessionConfiguration = conf;
 
+    useSecureMode = conf.getBoolean(KEY_USE_SECURE_MODE,
+        DEFAULT_USE_SECURE_MODE);
+    useLocalSasKeyMode = conf.getBoolean(KEY_USE_LOCAL_SAS_KEY_MODE,
+        DEFAULT_USE_LOCAL_SAS_KEY_MODE);
+
+    if (null == this.storageInteractionLayer) {
+      if (!useSecureMode) {
+        this.storageInteractionLayer = new StorageInterfaceImpl();
+      } else {
+        this.storageInteractionLayer = new SecureStorageInterfaceImpl(
+            useLocalSasKeyMode, conf, delegationToken);
+      }
+    }
+
     // Start an Azure storage session.
     //
     createAzureStorageSession();
@@ -791,6 +827,31 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   /**
+   * Method to set up the Storage Interaction layer in Secure mode.
+   * @param accountName - Storage account provided in the initializer
+   * @param containerName - Container name provided in the initializer
+   * @param sessionUri - URI provided in the initializer
+   */
+  private void connectToAzureStorageInSecureMode(String accountName,
+      String containerName, URI sessionUri)
+          throws AzureException, StorageException, URISyntaxException {
+
+    // Assertion: storageInteractionLayer instance has to be a SecureStorageInterfaceImpl
+    if (!(this.storageInteractionLayer instanceof SecureStorageInterfaceImpl)) {
+      throw new AssertionError("connectToAzureStorageInSASKeyMode() should be called only"
+        + " for SecureStorageInterfaceImpl instances");
+    }
+
+    ((SecureStorageInterfaceImpl) this.storageInteractionLayer).
+      setStorageAccountName(accountName);
+
+    container = storageInteractionLayer.getContainerReference(containerName);
+    rootDirectory = container.getDirectoryReference("");
+
+    canCreateOrModifyContainer = true;
+  }
+
+  /**
    * Connect to Azure storage using account key credentials.
    */
   private void connectUsingConnectionStringCredentials(
@@ -920,6 +981,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         return;
       }
 
+      // If the securemode flag is set, WASB uses SecureStorageInterfaceImpl instance
+      // to communicate with Azure storage. In SecureStorageInterfaceImpl SAS keys
+      // are used to communicate with Azure storage, so connectToAzureStorageInSecureMode
+      // instantiates the default container using a SAS Key.
+      if (useSecureMode) {
+        connectToAzureStorageInSecureMode(accountName, containerName, sessionUri);
+        return;
+      }
+
       // Check whether we have a shared access signature for that container.
       String propertyValue = sessionConfiguration.get(KEY_ACCOUNT_SAS_PREFIX
           + containerName + "." + accountName);
@@ -1330,7 +1400,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    */
   private OutputStream openOutputStream(final CloudBlobWrapper blob)
       throws StorageException {
-    if (blob instanceof CloudPageBlobWrapperImpl){
+    if (blob instanceof CloudPageBlobWrapper){
       return new PageBlobOutputStream(
           (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java
new file mode 100644
index 0000000..e6f1597
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPermissions;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import com.microsoft.azure.storage.SharedAccessAccountResourceType;
+import com.microsoft.azure.storage.SharedAccessAccountService;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+/***
+ * Local SAS Key Generation implementation. This class resides in
+ * the same address space as the WASB driver.
+ *
+ * This class gets typically used for testing purposes.
+ *
+ */
+
+public class LocalSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
+
+  /**
+   * Map to cache CloudStorageAccount instances.
+   */
+  private Map<String, CloudStorageAccount> storageAccountMap;
+
+  private static final int HOURS_IN_DAY = 24;
+  public LocalSASKeyGeneratorImpl(Configuration conf) {
+    super(conf);
+    storageAccountMap = new HashMap<String, CloudStorageAccount>();
+  }
+
+  /**
+   * Implementation to generate SAS Key for a container
+   */
+  @Override
+  public URI getContainerSASUri(String accountName, String container)
+      throws SASKeyGenerationException {
+
+    try {
+
+      CloudStorageAccount account =
+          getSASKeyBasedStorageAccountInstance(accountName);
+      CloudBlobClient client = account.createCloudBlobClient();
+      return client.getCredentials().transformUri(
+          client.getContainerReference(container).getUri());
+
+    } catch (StorageException stoEx) {
+      throw new SASKeyGenerationException("Encountered StorageException while"
+          + " generating SAS Key for container " + container + " inside "
+              + "storage account " + accountName, stoEx);
+    } catch (URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException while"
+          + " generating SAS Key for container " + container + " inside storage"
+              + " account " + accountName, uriSyntaxEx);
+    }
+  }
+
+  /**
+   * Helper method that creates a CloudStorageAccount instance based on
+   *  SAS key for accountName
+   *
+   * @param accountName Storage Account Name
+   * @return CloudStorageAccount instance created using SAS key for
+   *   the Storage Account.
+   * @throws SASKeyGenerationException
+   */
+  private CloudStorageAccount getSASKeyBasedStorageAccountInstance(
+      String accountName) throws SASKeyGenerationException {
+
+    try {
+
+      String accountNameWithoutDomain =
+          getAccountNameWithoutDomain(accountName);
+
+      CloudStorageAccount account =
+          getStorageAccountInstance(accountNameWithoutDomain,
+              AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+                  accountName, getConf()));
+
+      return new CloudStorageAccount(
+          new StorageCredentialsSharedAccessSignature(
+              account.generateSharedAccessSignature(
+                  getDefaultAccountAccessPolicy())), false,
+                  account.getEndpointSuffix(), accountNameWithoutDomain);
+
+    } catch (KeyProviderException keyProviderEx) {
+      throw new SASKeyGenerationException("Encountered KeyProviderException"
+          + " while retrieving Storage key from configuration for account "
+          + accountName, keyProviderEx);
+    } catch (InvalidKeyException invalidKeyEx) {
+      throw new SASKeyGenerationException("Encoutered InvalidKeyException "
+          + "while generating Account level SAS key for account" + accountName,
+          invalidKeyEx);
+    } catch(StorageException storeEx) {
+      throw new SASKeyGenerationException("Encoutered StorageException while "
+          + "generating Account level SAS key for account" + accountName,
+            storeEx);
+    } catch(URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException for"
+          + " account " + accountName, uriSyntaxEx);
+    }
+  }
+
+  /**
+   * Implementation for generation of Relative Path Blob SAS Uri.
+   */
+  @Override
+  public URI getRelativeBlobSASUri(String accountName, String container,
+      String relativePath) throws SASKeyGenerationException {
+
+    CloudBlobContainer sc = null;
+    CloudBlobClient client = null;
+    try {
+      CloudStorageAccount account =
+          getSASKeyBasedStorageAccountInstance(accountName);
+      client = account.createCloudBlobClient();
+      sc = client.getContainerReference(container);
+    } catch (URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException "
+          + "while getting container references for container " + container
+          + " inside storage account : " + accountName, uriSyntaxEx);
+    } catch (StorageException stoEx) {
+      throw new SASKeyGenerationException("Encountered StorageException while "
+          + "getting  container references for container " + container
+          + " inside storage account : " + accountName, stoEx);
+    }
+
+    CloudBlockBlob blob = null;
+    try {
+      blob = sc.getBlockBlobReference(relativePath);
+    } catch (URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException while "
+          + "getting Block Blob references for container " + container
+          + " inside storage account : " + accountName, uriSyntaxEx);
+    } catch (StorageException stoEx) {
+      throw new SASKeyGenerationException("Encountered StorageException while "
+          + "getting Block Blob references for container " + container
+          + " inside storage account : " + accountName, stoEx);
+    }
+
+    try {
+      return client.getCredentials().transformUri(blob.getUri());
+    } catch (StorageException stoEx) {
+      throw new SASKeyGenerationException("Encountered StorageException while "
+          + "generating SAS key for Blob: " + relativePath + " inside "
+          + "container : " + container + " in Storage Account : " + accountName,
+              stoEx);
+    } catch (URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException "
+          + "while generating SAS key for Blob: " + relativePath + " inside "
+          + "container: " + container + " in Storage Account : " + accountName,
+              uriSyntaxEx);
+    }
+  }
+
+  /**
+   * Helper method that creates CloudStorageAccount Instance using the
+   * storage account key.
+   * @param accountName Name of the storage account
+   * @param accountKey Storage Account key
+   * @return CloudStorageAccount instance for the storage account.
+   * @throws SASKeyGenerationException
+   */
+  private CloudStorageAccount getStorageAccountInstance(String accountName,
+      String accountKey) throws SASKeyGenerationException {
+
+    if (!storageAccountMap.containsKey(accountName)) {
+
+      CloudStorageAccount account = null;
+      try {
+        account =
+            new CloudStorageAccount(new StorageCredentialsAccountAndKey(
+                accountName, accountKey));
+      } catch (URISyntaxException uriSyntaxEx) {
+        throw new SASKeyGenerationException("Encountered URISyntaxException "
+            + "for account " + accountName, uriSyntaxEx);
+      }
+
+      storageAccountMap.put(accountName, account);
+    }
+
+    return storageAccountMap.get(accountName);
+  }
+
+  /**
+   * Helper method that returns the Storage account name without
+   * the domain name suffix.
+   * @param fullAccountName Storage account name with domain name suffix
+   * @return String
+   */
+  private String getAccountNameWithoutDomain(String fullAccountName) {
+    StringTokenizer tokenizer = new StringTokenizer(fullAccountName, ".");
+    return tokenizer.nextToken();
+  }
+
+  /**
+   * Helper method to generate Access Policy for the Storage Account SAS Key
+   * @return SharedAccessAccountPolicy
+   */
+  private SharedAccessAccountPolicy getDefaultAccountAccessPolicy() {
+
+    SharedAccessAccountPolicy ap =
+        new SharedAccessAccountPolicy();
+
+    Calendar cal = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
+    cal.setTime(new Date());
+    cal.add(Calendar.HOUR, (int) getSasKeyExpiryPeriod() * HOURS_IN_DAY);
+
+    ap.setSharedAccessExpiryTime(cal.getTime());
+    ap.setPermissions(getDefaultAccoutSASKeyPermissions());
+    ap.setResourceTypes(EnumSet.of(SharedAccessAccountResourceType.CONTAINER,
+        SharedAccessAccountResourceType.OBJECT));
+    ap.setServices(EnumSet.of(SharedAccessAccountService.BLOB));
+
+    return ap;
+  }
+
+  private EnumSet<SharedAccessAccountPermissions> getDefaultAccoutSASKeyPermissions() {
+    return EnumSet.of(SharedAccessAccountPermissions.ADD,
+        SharedAccessAccountPermissions.CREATE,
+        SharedAccessAccountPermissions.DELETE,
+        SharedAccessAccountPermissions.LIST,
+        SharedAccessAccountPermissions.READ,
+        SharedAccessAccountPermissions.UPDATE,
+        SharedAccessAccountPermissions.WRITE);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
new file mode 100644
index 0000000..404419d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
+
+/**
+ * Class implementing a RemoteSASKeyGenerator. This class
+ * uses the url passed in via the Configuration to make a
+ * rest call to generate the required SAS Key.
+ */
+public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
+  /**
+   * Configuration parameter name expected in the Configuration
+   * object to provide the url of the remote service {@value}
+   */
+  private static final String KEY_CRED_SERVICE_URL =
+      "fs.azure.cred.service.url";
+
+  /**
+   * Container SAS Key generation OP name. {@value}
+   */
+  private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
+
+  /**
+   * Relative Blob SAS Key generation OP name. {@value}
+   */
+  private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
+
+  /**
+   * Query parameter specifying the expiry period to be used for sas key
+   * {@value}
+   */
+  private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
+
+  /**
+   * Query parameter name for the storage account. {@value}
+   */
+  private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
+      "storage_account";
+
+  /**
+   * Query parameter name for the storage account container. {@value}
+   */
+  private static final String CONTAINER_QUERY_PARAM_NAME =
+      "container";
+
+  /**
+   * Query parameter name for user info {@value}
+   */
+  private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
+      "delegation_token";
+
+  /**
+   * Query parameter name for the relative path inside the storage
+   * account container. {@value}
+   */
+  private static final String RELATIVE_PATH_QUERY_PARAM_NAME =
+      "relative_path";
+
+  private String delegationToken = "";
+  private String credServiceUrl = "";
+  private WasbRemoteCallHelper remoteCallHelper = null;
+
+  public RemoteSASKeyGeneratorImpl(Configuration conf) {
+    super(conf);
+  }
+
+  public boolean initialize(Configuration conf, String delegationToken) {
+
+    LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
+    credServiceUrl = conf.get(KEY_CRED_SERVICE_URL);
+
+    if (delegationToken == null || delegationToken.isEmpty()) {
+      LOG.error("Delegation Token not provided for initialization"
+          + " of RemoteSASKeyGenerator");
+      return false;
+    }
+
+    this.delegationToken = delegationToken;
+
+    if (credServiceUrl == null || credServiceUrl.isEmpty()) {
+      LOG.error("CredService Url not found in configuration to initialize"
+          + " RemoteSASKeyGenerator");
+      return false;
+    }
+
+    remoteCallHelper = new WasbRemoteCallHelper();
+    LOG.debug("Initialization of RemoteSASKeyGenerator instance successfull");
+    return true;
+  }
+
+  @Override
+  public URI getContainerSASUri(String storageAccount, String container)
+      throws SASKeyGenerationException {
+
+    try {
+
+      LOG.debug("Generating Container SAS Key for Container {} "
+          + "inside Storage Account {} ", container, storageAccount);
+      URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
+      uriBuilder.setPath("/" + CONTAINER_SAS_OP);
+      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
+          storageAccount);
+      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
+          container);
+      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+          + getSasKeyExpiryPeriod());
+      uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+          this.delegationToken);
+
+      RemoteSASKeyGenerationResponse sasKeyResponse =
+          makeRemoteRequest(uriBuilder.build());
+
+      if (sasKeyResponse == null) {
+        throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+            + " object null from remote call");
+      } else if (sasKeyResponse.getResponseCode()
+          == REMOTE_CALL_SUCCESS_CODE) {
+        return new URI(sasKeyResponse.getSasKey());
+      } else {
+        throw new SASKeyGenerationException("Remote Service encountered error"
+            + " in SAS Key generation : "
+            + sasKeyResponse.getResponseMessage());
+      }
+    } catch (URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException "
+          + "while building the HttpGetRequest to remote cred service",
+          uriSyntaxEx);
+    }
+  }
+
+  @Override
+  public URI getRelativeBlobSASUri(String storageAccount, String container,
+      String relativePath) throws SASKeyGenerationException {
+
+    try {
+
+      LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
+          + " Container {} inside Storage Account {} ",
+          relativePath, container, storageAccount);
+      URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
+      uriBuilder.setPath("/" + BLOB_SAS_OP);
+      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
+          storageAccount);
+      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
+          container);
+      uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
+          relativePath);
+      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+          + getSasKeyExpiryPeriod());
+      uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+          this.delegationToken);
+
+      RemoteSASKeyGenerationResponse sasKeyResponse =
+          makeRemoteRequest(uriBuilder.build());
+
+      if (sasKeyResponse == null) {
+        throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+            + " object null from remote call");
+      } else if (sasKeyResponse.getResponseCode()
+          == REMOTE_CALL_SUCCESS_CODE) {
+        return new URI(sasKeyResponse.getSasKey());
+      } else {
+        throw new SASKeyGenerationException("Remote Service encountered error"
+            + " in SAS Key generation : "
+            + sasKeyResponse.getResponseMessage());
+      }
+    } catch (URISyntaxException uriSyntaxEx) {
+      throw new SASKeyGenerationException("Encountered URISyntaxException"
+          + " while building the HttpGetRequest to " + " remote service",
+          uriSyntaxEx);
+    }
+  }
+
+  /**
+   * Helper method to make a remote request.
+   * @param uri - Uri to use for the remote request
+   * @return RemoteSASKeyGenerationResponse
+   */
+  private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri)
+      throws SASKeyGenerationException {
+
+    try {
+      String responseBody =
+          remoteCallHelper.makeRemoteGetRequest(new HttpGet(uri));
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      return objectMapper.readValue(responseBody,
+          RemoteSASKeyGenerationResponse.class);
+
+    } catch (WasbRemoteCallException remoteCallEx) {
+      throw new SASKeyGenerationException("Encountered RemoteCallException"
+          + " while retrieving SAS key from remote service", remoteCallEx);
+    } catch (JsonParseException jsonParserEx) {
+      throw new SASKeyGenerationException("Encountered JsonParseException "
+          + "while parsing the response from remote"
+          + " service into RemoteSASKeyGenerationResponse object", jsonParserEx);
+    } catch (JsonMappingException jsonMappingEx) {
+      throw new SASKeyGenerationException("Encountered JsonMappingException"
+          + " while mapping the response from remote service into "
+          + "RemoteSASKeyGenerationResponse object", jsonMappingEx);
+    } catch (IOException ioEx) {
+      throw new SASKeyGenerationException("Encountered IOException while "
+          + "accessing remote service to retrieve SAS Key", ioEx);
+    }
+  }
+}
+
+/**
+ * POJO representing the response expected from a Remote
+ * SAS Key generation service.
+ * The remote SAS Key generation service is expected to
+ * return SAS key in json format:
+ * {
+ *    "responseCode" : 0 or non-zero <int>,
+ *    "responseMessage" : relavant message on failure <String>,
+ *    "sasKey" : Requested SAS Key <String>
+ * }
+ */
+class RemoteSASKeyGenerationResponse {
+
+  /**
+   * Response code for the call.
+   */
+  private int responseCode;
+
+  /**
+   * An intelligent message corresponding to
+   * result. Specifically in case of failure
+   * the reason for failure.
+   */
+  private String responseMessage;
+
+  /**
+   * SAS Key corresponding to the request.
+   */
+  private String sasKey;
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+
+  public void setResponseCode(int responseCode) {
+    this.responseCode = responseCode;
+  }
+
+  public String getResponseMessage() {
+    return responseMessage;
+  }
+
+  public void setResponseMessage(String responseMessage) {
+    this.responseMessage = responseMessage;
+  }
+
+  public String getSasKey() {
+    return sasKey;
+  }
+
+  public void setSasKey(String sasKey) {
+    this.sasKey = sasKey;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java
new file mode 100644
index 0000000..7cfafc3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Exception that gets thrown during generation of SAS Key.
+ *
+ */
+public class SASKeyGenerationException extends AzureException {
+
+  private static final long serialVersionUID = 1L;
+
+  public SASKeyGenerationException(String message) {
+    super(message);
+  }
+
+  public SASKeyGenerationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SASKeyGenerationException(Throwable t) {
+    super(t);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java
new file mode 100644
index 0000000..4acd6e4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Abstract base class for the SAS Key Generator implementation
+ *
+ */
+public abstract class SASKeyGeneratorImpl implements SASKeyGeneratorInterface {
+
+  /**
+   * Configuration key to be used to specify the expiry period for SAS keys
+   * This value currently is specified in days. {@value}
+   */
+  public static final String KEY_SAS_KEY_EXPIRY_PERIOD =
+      "fs.azure.sas.expiry.period";
+
+  /**
+   * Default value for the SAS key expiry period in days. {@value}
+   */
+  public static final long DEFAUL_CONTAINER_SAS_KEY_PERIOD = 90;
+
+  private long sasKeyExpiryPeriod;
+
+  private Configuration conf;
+
+  public SASKeyGeneratorImpl(Configuration conf) {
+    this.conf = conf;
+    this.sasKeyExpiryPeriod = conf.getTimeDuration(
+        KEY_SAS_KEY_EXPIRY_PERIOD, DEFAUL_CONTAINER_SAS_KEY_PERIOD,
+        TimeUnit.DAYS);
+  }
+
+  public long getSasKeyExpiryPeriod() {
+    return sasKeyExpiryPeriod;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java
new file mode 100644
index 0000000..8d871eb
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+
+/**
+ * Iterface used by AzureNativeFileSysteStore to retrieve SAS Keys for the
+ * respective azure storage entity. This interface is expected to be
+ * implemented in two modes:
+ * 1) Local Mode: In this mode SAS Keys are generated
+ *    in same address space as the WASB. This will be primarily used for
+ *    testing purposes.
+ * 2) Remote Mode: In this mode SAS Keys are generated in a sepearte process
+ *    other than WASB and will be communicated via client.
+ */
+public interface SASKeyGeneratorInterface {
+
+  /**
+   * Interface method to retrieve SAS Key for a container within the storage
+   * account.
+   *
+   * @param accountName
+   *          - Storage account name
+   * @param container
+   *          - Container name within the storage account.
+   * @return SAS URI for the container.
+   * @throws SASKeyGenerationException
+   */
+  URI getContainerSASUri(String accountName, String container)
+      throws SASKeyGenerationException;
+
+  /**
+   * Interface method to retrieve SAS Key for a blob within the container of the
+   * storage account.
+   *
+   * @param accountName
+   *          - Storage account name
+   * @param container
+   *          - Container name within the storage account.
+   * @param relativePath
+   *          - Relative path within the container
+   * @return SAS URI for the relative path blob.
+   * @throws SASKeyGenerationException
+   */
+  URI getRelativeBlobSASUri(String accountName, String container,
+      String relativePath) throws SASKeyGenerationException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java
new file mode 100644
index 0000000..5bec77d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Exception that is thrown when any error is encountered
+ * is SAS Mode operation of WASB.
+ */
+public class SecureModeException extends AzureException {
+
+  private static final long serialVersionUID = 1L;
+
+  public SecureModeException(String message) {
+    super(message);
+  }
+
+  public SecureModeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SecureModeException(Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
new file mode 100644
index 0000000..6749a76
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.RetryPolicyFactory;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageUri;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import com.microsoft.azure.storage.blob.CopyState;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.PageRange;
+import com.microsoft.azure.storage.blob.BlockEntry;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/***
+ * An implementation of the StorageInterface for SAS Key mode.
+ *
+ */
+
+public class SecureStorageInterfaceImpl extends StorageInterface {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      SecureStorageInterfaceImpl.class);
+  public static final String SAS_ERROR_CODE = "SAS Error";
+  private SASKeyGeneratorInterface sasKeyGenerator;
+  private String storageAccount;
+  private String delegationToken;
+
+  public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
+      Configuration conf, String delegationToken)
+          throws SecureModeException {
+
+    this.delegationToken = delegationToken;
+    if (useLocalSASKeyMode) {
+      this.sasKeyGenerator = new LocalSASKeyGeneratorImpl(conf);
+    } else {
+      RemoteSASKeyGeneratorImpl remoteSasKeyGenerator =
+          new RemoteSASKeyGeneratorImpl(conf);
+      if (!remoteSasKeyGenerator.initialize(conf, this.delegationToken)) {
+        throw new SecureModeException("Remote SAS Key mode could"
+            + " not be initialized");
+      }
+      this.sasKeyGenerator = remoteSasKeyGenerator;
+    }
+  }
+
+  @Override
+  public void setTimeoutInMs(int timeoutInMs) {
+  }
+
+  @Override
+  public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
+  }
+
+  @Override
+  public void createBlobClient(CloudStorageAccount account) {
+    String errorMsg = "createBlobClient is an invalid operation in"
+        + " SAS Key Mode";
+    LOG.error(errorMsg);
+    throw new UnsupportedOperationException(errorMsg);
+  }
+
+  @Override
+  public void createBlobClient(URI baseUri) {
+    String errorMsg = "createBlobClient is an invalid operation in "
+        + "SAS Key Mode";
+    LOG.error(errorMsg);
+    throw new UnsupportedOperationException(errorMsg);
+  }
+
+  @Override
+  public void createBlobClient(URI baseUri, StorageCredentials credentials) {
+    String errorMsg = "createBlobClient is an invalid operation in SAS "
+        + "Key Mode";
+    LOG.error(errorMsg);
+    throw new UnsupportedOperationException(errorMsg);
+  }
+
+  @Override
+  public StorageCredentials getCredentials() {
+    String errorMsg = "getCredentials is an invalid operation in SAS "
+        + "Key Mode";
+    LOG.error(errorMsg);
+    throw new UnsupportedOperationException(errorMsg);
+  }
+
+  @Override
+  public CloudBlobContainerWrapper getContainerReference(String name)
+      throws URISyntaxException, StorageException {
+
+    try {
+      return new SASCloudBlobContainerWrapperImpl(storageAccount,
+          new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
+              storageAccount, name)), sasKeyGenerator);
+    } catch (SASKeyGenerationException sasEx) {
+      String errorMsg = "Encountered SASKeyGeneration exception while "
+          + "generating SAS Key for container : " + name
+          + " inside Storage account : " + storageAccount;
+      LOG.error(errorMsg);
+      throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
+    }
+  }
+
+  public void setStorageAccountName(String storageAccount) {
+    this.storageAccount = storageAccount;
+  }
+
+  @InterfaceAudience.Private
+  static class SASCloudBlobContainerWrapperImpl
+    extends CloudBlobContainerWrapper {
+
+    private final CloudBlobContainer container;
+    private String storageAccount;
+    private SASKeyGeneratorInterface sasKeyGenerator;
+
+    public SASCloudBlobContainerWrapperImpl(String storageAccount,
+        CloudBlobContainer container, SASKeyGeneratorInterface sasKeyGenerator) {
+      this.storageAccount = storageAccount;
+      this.container = container;
+      this.sasKeyGenerator = sasKeyGenerator;
+    }
+
+    @Override
+    public String getName() {
+      return container.getName();
+    }
+
+    @Override
+    public boolean exists(OperationContext opContext) throws StorageException {
+      return container.exists(AccessCondition.generateEmptyCondition(), null,
+          opContext);
+    }
+
+    @Override
+    public void create(OperationContext opContext) throws StorageException {
+      container.create(null, opContext);
+    }
+
+    @Override
+    public HashMap<String, String> getMetadata() {
+      return container.getMetadata();
+    }
+
+    @Override
+    public void setMetadata(HashMap<String, String> metadata) {
+      container.setMetadata(metadata);
+    }
+
+    @Override
+    public void downloadAttributes(OperationContext opContext)
+        throws StorageException {
+      container.downloadAttributes(AccessCondition.generateEmptyCondition(),
+          null, opContext);
+    }
+
+    @Override
+    public void uploadMetadata(OperationContext opContext)
+        throws StorageException {
+      container.uploadMetadata(AccessCondition.generateEmptyCondition(), null,
+          opContext);
+    }
+
+    @Override
+    public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath)
+        throws URISyntaxException, StorageException {
+
+      CloudBlobDirectory dir = container.getDirectoryReference(relativePath);
+      return new SASCloudBlobDirectoryWrapperImpl(dir);
+    }
+
+    @Override
+    public CloudBlobWrapper getBlockBlobReference(String relativePath)
+        throws URISyntaxException, StorageException {
+      try {
+        return new SASCloudBlockBlobWrapperImpl(
+            new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
+                storageAccount, getName(), relativePath)));
+      } catch (SASKeyGenerationException sasEx) {
+        String errorMsg = "Encountered SASKeyGeneration exception while "
+            + "generating SAS Key for relativePath : " + relativePath
+            + " inside container : " + getName()  + " Storage account : " + storageAccount;
+        LOG.error(errorMsg);
+        throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
+      }
+    }
+
+    @Override
+    public CloudBlobWrapper getPageBlobReference(String relativePath)
+        throws URISyntaxException, StorageException {
+      try {
+        return new SASCloudPageBlobWrapperImpl(
+            new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
+                storageAccount, getName(), relativePath)));
+      } catch (SASKeyGenerationException sasEx) {
+        String errorMsg = "Encountered SASKeyGeneration exception while "
+            + "generating SAS Key for relativePath : " + relativePath
+            + " inside container : " + getName()
+            + " Storage account : " + storageAccount;
+        LOG.error(errorMsg);
+        throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
+      }
+    }
+  }
+
+  //
+  // WrappingIterator
+  //
+
+  /**
+   * This iterator wraps every ListBlobItem as they come from the listBlobs()
+   * calls to their proper wrapping objects.
+   */
+  private static class SASWrappingIterator implements Iterator<ListBlobItem> {
+    private final Iterator<ListBlobItem> present;
+
+    public SASWrappingIterator(Iterator<ListBlobItem> present) {
+      this.present = present;
+    }
+
+    public static Iterable<ListBlobItem> wrap(
+        final Iterable<ListBlobItem> present) {
+      return new Iterable<ListBlobItem>() {
+        @Override
+        public Iterator<ListBlobItem> iterator() {
+          return new SASWrappingIterator(present.iterator());
+        }
+      };
+    }
+
+    @Override
+    public boolean hasNext() {
+      return present.hasNext();
+    }
+
+    @Override
+    public ListBlobItem next() {
+      ListBlobItem unwrapped = present.next();
+      if (unwrapped instanceof CloudBlobDirectory) {
+        return new SASCloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
+      } else if (unwrapped instanceof CloudBlockBlob) {
+        return new SASCloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
+      } else if (unwrapped instanceof CloudPageBlob) {
+        return new SASCloudPageBlobWrapperImpl((CloudPageBlob) unwrapped);
+      } else {
+        return unwrapped;
+      }
+    }
+
+    @Override
+    public void remove() {
+      present.remove();
+    }
+  }
+
+  //
+  // CloudBlobDirectoryWrapperImpl
+  //
+  @InterfaceAudience.Private
+  static class SASCloudBlobDirectoryWrapperImpl extends CloudBlobDirectoryWrapper {
+    private final CloudBlobDirectory directory;
+
+    public SASCloudBlobDirectoryWrapperImpl(CloudBlobDirectory directory) {
+      this.directory = directory;
+    }
+
+    @Override
+    public URI getUri() {
+      return directory.getUri();
+    }
+
+    @Override
+    public Iterable<ListBlobItem> listBlobs(String prefix,
+        boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
+        BlobRequestOptions options, OperationContext opContext)
+        throws URISyntaxException, StorageException {
+      return SASWrappingIterator.wrap(directory.listBlobs(prefix,
+          useFlatBlobListing, listingDetails, options, opContext));
+    }
+
+    @Override
+    public CloudBlobContainer getContainer() throws URISyntaxException,
+        StorageException {
+      return directory.getContainer();
+    }
+
+    @Override
+    public CloudBlobDirectory getParent() throws URISyntaxException,
+        StorageException {
+      return directory.getParent();
+    }
+
+    @Override
+    public StorageUri getStorageUri() {
+      return directory.getStorageUri();
+    }
+  }
+
+  abstract static class SASCloudBlobWrapperImpl implements CloudBlobWrapper {
+    private final CloudBlob blob;
+    @Override
+    public CloudBlob getBlob() {
+      return blob;
+    }
+
+    public URI getUri() {
+      return getBlob().getUri();
+    }
+
+    protected SASCloudBlobWrapperImpl(CloudBlob blob) {
+      this.blob = blob;
+    }
+
+    @Override
+    public HashMap<String, String> getMetadata() {
+      return getBlob().getMetadata();
+    }
+
+    @Override
+    public void delete(OperationContext opContext, SelfRenewingLease lease)
+        throws StorageException {
+      getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease),
+          null, opContext);
+    }
+
+    /**
+     * Return and access condition for this lease, or else null if
+     * there's no lease.
+     */
+    private AccessCondition getLeaseCondition(SelfRenewingLease lease) {
+      AccessCondition leaseCondition = null;
+      if (lease != null) {
+        leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID());
+      }
+      return leaseCondition;
+    }
+
+    @Override
+    public boolean exists(OperationContext opContext)
+        throws StorageException {
+      return getBlob().exists(null, null, opContext);
+    }
+
+    @Override
+    public void downloadAttributes(
+        OperationContext opContext) throws StorageException {
+      getBlob().downloadAttributes(null, null, opContext);
+    }
+
+    @Override
+    public BlobProperties getProperties() {
+      return getBlob().getProperties();
+    }
+
+    @Override
+    public void setMetadata(HashMap<String, String> metadata) {
+      getBlob().setMetadata(metadata);
+    }
+
+    @Override
+    public InputStream openInputStream(
+        BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      return getBlob().openInputStream(null, options, opContext);
+    }
+
+    public OutputStream openOutputStream(
+        BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
+    }
+
+    public void upload(InputStream sourceStream, OperationContext opContext)
+        throws StorageException, IOException {
+      getBlob().upload(sourceStream, 0, null, null, opContext);
+    }
+
+    @Override
+    public CloudBlobContainer getContainer() throws URISyntaxException,
+        StorageException {
+      return getBlob().getContainer();
+    }
+
+    @Override
+    public CloudBlobDirectory getParent() throws URISyntaxException,
+        StorageException {
+      return getBlob().getParent();
+    }
+
+    @Override
+    public void uploadMetadata(OperationContext opContext)
+        throws StorageException {
+      uploadMetadata(null, null, opContext);
+    }
+
+    @Override
+    public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException{
+      getBlob().uploadMetadata(accessConditions, options, opContext);
+    }
+
+    public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
+        throws StorageException {
+
+      // Include lease in request if lease not null.
+      getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
+    }
+
+    @Override
+    public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
+      getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
+    }
+
+    @Override
+    public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
+      getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes);
+    }
+
+    @Override
+    public StorageUri getStorageUri() {
+      return getBlob().getStorageUri();
+    }
+
+    @Override
+    public CopyState getCopyState() {
+      return getBlob().getCopyState();
+    }
+
+    @Override
+    public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
+        OperationContext opContext)
+            throws StorageException, URISyntaxException {
+      getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
+          null, null, options, opContext);
+    }
+
+    @Override
+    public void downloadRange(long offset, long length, OutputStream outStream,
+        BlobRequestOptions options, OperationContext opContext)
+            throws StorageException, IOException {
+
+      getBlob().downloadRange(offset, length, outStream, null, options, opContext);
+    }
+
+    @Override
+    public SelfRenewingLease acquireLease() throws StorageException {
+      return new SelfRenewingLease(this);
+    }
+  }
+
+  //
+  // CloudBlockBlobWrapperImpl
+  //
+
+  static class SASCloudBlockBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudBlockBlobWrapper {
+
+    public SASCloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
+      super(blob);
+    }
+
+    public OutputStream openOutputStream(
+        BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
+    }
+
+    public void upload(InputStream sourceStream, OperationContext opContext)
+        throws StorageException, IOException {
+      getBlob().upload(sourceStream, 0, null, null, opContext);
+    }
+
+    public void uploadProperties(OperationContext opContext)
+        throws StorageException {
+      getBlob().uploadProperties(null, null, opContext);
+    }
+
+    @Override
+    public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
+
+    }
+
+    @Override
+    public void uploadBlock(String blockId, InputStream sourceStream,
+        long length, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+    }
+
+    @Override
+    public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
+    }
+  }
+
+  static class SASCloudPageBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudPageBlobWrapper {
+    public SASCloudPageBlobWrapperImpl(CloudPageBlob blob) {
+      super(blob);
+    }
+
+    public void create(final long length, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      ((CloudPageBlob) getBlob()).create(length, null, options, opContext);
+    }
+
+    public void uploadPages(final InputStream sourceStream, final long offset,
+        final long length, BlobRequestOptions options, OperationContext opContext)
+        throws StorageException, IOException {
+      ((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null,
+          options, opContext);
+    }
+
+    public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      return ((CloudPageBlob) getBlob()).downloadPageRanges(
+          null, options, opContext);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java
new file mode 100644
index 0000000..43c1b61
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Exception that gets thrown when a remote call
+ * made from WASB to external cred service fails.
+ */
+
+public class WasbRemoteCallException extends AzureException {
+
+  private static final long serialVersionUID = 1L;
+
+  public WasbRemoteCallException(String message) {
+    super(message);
+  }
+
+  public WasbRemoteCallException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public WasbRemoteCallException(Throwable t) {
+    super(t);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
new file mode 100644
index 0000000..543c899
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+/**
+ * Helper class the has constants and helper methods
+ * used in WASB when integrating with a remote http cred
+ * service. Currently, remote service will be used to generate
+ * SAS keys.
+ */
+class WasbRemoteCallHelper {
+
+  /**
+   * Return code when the remote call is successful. {@value}
+   */
+  public static final int REMOTE_CALL_SUCCESS_CODE = 0;
+
+  /**
+   * Client instance to be used for making the remote call.
+   */
+  private HttpClient client = null;
+
+  public WasbRemoteCallHelper() {
+    this.client = HttpClientBuilder.create().build();
+  }
+
+  /**
+   * Helper method to make remote HTTP Get request.
+   * @param getRequest - HttpGet request object constructed by caller.
+   * @return Http Response body returned as a string. The caller
+   *  is expected to semantically understand the response.
+   * @throws WasbRemoteCallException
+   */
+  public String makeRemoteGetRequest(HttpGet getRequest)
+      throws WasbRemoteCallException {
+
+    try {
+
+      HttpResponse response = client.execute(getRequest);
+
+      if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+        throw new WasbRemoteCallException(
+            response.getStatusLine().toString());
+      }
+
+      BufferedReader rd = new BufferedReader(
+          new InputStreamReader(response.getEntity().getContent(),
+              StandardCharsets.UTF_8));
+      StringBuilder responseBody = new StringBuilder();
+      String responseLine = "";
+      while ((responseLine = rd.readLine()) != null) {
+        responseBody.append(responseLine);
+      }
+      rd.close();
+      return responseBody.toString();
+
+    } catch (ClientProtocolException clientProtocolEx) {
+      throw new WasbRemoteCallException("Encountered ClientProtocolException"
+          + " while making remote call", clientProtocolEx);
+    } catch (IOException ioEx) {
+      throw new WasbRemoteCallException("Encountered IOException while making"
+          + " remote call", ioEx);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 5d71795..2865223 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -285,6 +285,54 @@ To enable 20 threads for Rename operation. Set configuration value to 0 or 1 to
       <value>20</value>
     </property>
 
+### <a name="WASB_SECURE_MODE" />WASB Secure mode and configuration
+
+WASB can operate in secure mode where the Storage access keys required to communicate with Azure storage does not have to
+be in the same address space as the process using WASB. In this mode all interactions with Azure storage is performed using
+SAS uris. There are two sub modes within the Secure mode, one is remote SAS key mode where the SAS keys are generated from
+a remote process and local mode where SAS keys are generated within WASB. By default the SAS Key mode is expected to run in
+Romote mode, however for testing purposes the local mode can be enabled to generate SAS keys in the same process as WASB.
+
+To enable Secure mode following property needs to be set to true.
+
+```
+    <property>
+      <name>fs.azure.secure.mode</name>
+      <value>true</value>
+    </property>
+```
+
+To enable SAS key generation locally following property needs to be set to true.
+
+```
+    <property>
+      <name>fs.azure.local.sas.key.mode</name>
+      <value>true</value>
+    </property>
+```
+To use the remote SAS key generation mode, an external REST service is expected to provided required SAS keys.
+Following property can used to provide the end point to use for remote SAS Key generation:
+
+```
+    <property>
+      <name>fs.azure.cred.service.url</name>
+      <value>{URL}</value>
+    </property>
+```
+The remote service is expected to provide support for two REST calls ```{URL}/GET_CONTAINER_SAS``` and ```{URL}/GET_RELATIVE_BLOB_SAS```, for generating
+container and relative blob sas keys. An example requests
+
+```{URL}/GET_CONTAINER_SAS?storage_account=<account_name>&container=<container>&sas_expiry=<expiry period>&delegation_token=<delegation token>```
+```{URL}/GET_CONTAINER_SAS?storage_account=<account_name>&container=<container>&relative_path=<relative path>&sas_expiry=<expiry period>&delegation_token=<delegation token>```
+
+The service is expected to return a response in JSON format:
+```
+{
+  "responseCode" : 0 or non-zero <int>,
+  "responseMessage" : relavant message on failure <String>,
+  "sasKey" : Requested SAS Key <String>
+}
+```
 ## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
 
 The hadoop-azure module includes a full suite of unit tests.  Most of the tests

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
index 56ec881..f6ab94d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
 import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.microsoft.azure.storage.blob.BlobOutputStream;
@@ -41,7 +43,7 @@ import com.microsoft.azure.storage.blob.CloudBlockBlob;
  */
 public class TestContainerChecks {
   private AzureBlobStorageTestAccount testAccount;
-
+  private boolean runningInSASMode = false;
   @After
   public void tearDown() throws Exception {
     if (testAccount != null) {
@@ -50,6 +52,12 @@ public class TestContainerChecks {
     }
   }
 
+  @Before
+  public void setMode() {
+    runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+        getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+  }
+
   @Test
   public void testContainerExistAfterDoesNotExist() throws Exception {
     testAccount = AzureBlobStorageTestAccount.create("",
@@ -155,6 +163,8 @@ public class TestContainerChecks {
 
   @Test
   public void testContainerChecksWithSas() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
     testAccount = AzureBlobStorageTestAccount.create("",
         EnumSet.of(CreateOptions.UseSas));
     assumeNotNull(testAccount);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
index cd9d1d4..9d2770e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
@@ -39,7 +39,6 @@ import java.io.File;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.alias.CredentialProvider;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FileContext;
@@ -48,6 +47,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -63,7 +64,7 @@ public class TestWasbUriAndConfiguration {
   protected String accountName;
   protected String accountKey;
   protected static Configuration conf = null;
-
+  private boolean runningInSASMode = false;
   @Rule
   public final TemporaryFolder tempDir = new TemporaryFolder();
 
@@ -77,6 +78,12 @@ public class TestWasbUriAndConfiguration {
     }
   }
 
+  @Before
+  public void setMode() {
+    runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+        getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+  }
+
   private boolean validateIOStreams(Path filePath) throws IOException {
     // Capture the file system from the test account.
     FileSystem fs = testAccount.getFileSystem();
@@ -128,6 +135,8 @@ public class TestWasbUriAndConfiguration {
 
   @Test
   public void testConnectUsingSAS() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
     // Create the test account with SAS credentials.
     testAccount = AzureBlobStorageTestAccount.create("",
         EnumSet.of(CreateOptions.UseSas, CreateOptions.CreateContainer));
@@ -142,6 +151,8 @@ public class TestWasbUriAndConfiguration {
 
   @Test
   public void testConnectUsingSASReadonly() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
     // Create the test account with SAS credentials.
     testAccount = AzureBlobStorageTestAccount.create("", EnumSet.of(
         CreateOptions.UseSas, CreateOptions.CreateContainer,
@@ -318,6 +329,8 @@ public class TestWasbUriAndConfiguration {
 
   @Test
   public void testCredsFromCredentialProvider() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
     String account = "testacct";
     String key = "testkey";
     // set up conf to have a cred provider

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
index 00611fc..e898aa6 100644
--- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
+++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
@@ -17,15 +17,28 @@
 <configuration xmlns:xi="http://www.w3.org/2001/XInclude">
 
   <!-- For tests against live azure, provide the following account information -->
+
   <!--
   <property>
     <name>fs.azure.test.account.name</name>
-      <value>{ACCOUNTNAME}.blob.core.windows.net</value>
+    <value>{ACCOUNTNAME}.blob.core.windows.net</value>
   </property>
   <property>
     <name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
     <value>{ACCOUNTKEY}</value>
   </property>
+  <property>
+    <name>fs.azure.secure.mode</name>
+    <value>false</value>
+  </property>
+  <property>
+    <name>fs.azure.local.sas.key.mode</name>
+    <value>false</value>
+  </property>
+  <property>
+    <name>fs.azure.cred.service.url</name>
+    <value>{CRED_SERIVCE_URL}</value>
+  </property>
   -->
 
   <!-- Save the above configuration properties in a separate file named -->


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[07/14] hadoop git commit: YARN-6000. Make AllocationFileLoaderService.Listener public. (Tao Jie via kasha)

Posted by xg...@apache.org.
YARN-6000. Make AllocationFileLoaderService.Listener public. (Tao Jie via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d3f73ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d3f73ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d3f73ac

Branch: refs/heads/YARN-5734
Commit: 4d3f73acc0a5cabc748132889dbe670bea178a3f
Parents: 4e90296
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Dec 23 11:40:56 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Dec 23 11:40:56 2016 -0800

----------------------------------------------------------------------
 .../scheduler/fair/AllocationFileLoaderService.java                | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d3f73ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 3aecbfd..cd4a19b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -639,7 +639,7 @@ public class AllocationFileLoaderService extends AbstractService {
     return defaultPermissions;
   }
 
-  interface Listener {
+  public interface Listener {
     void onReload(AllocationConfiguration info) throws IOException;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[14/14] hadoop git commit: HDFS-11270. Document the missing options of NameNode bootstrap command. Contributed by Yiqun Lin

Posted by xg...@apache.org.
HDFS-11270. Document the missing options of NameNode bootstrap command. Contributed by Yiqun Lin


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0e0ef29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0e0ef29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0e0ef29

Branch: refs/heads/YARN-5734
Commit: c0e0ef29696109af9a018462059f08fd99ee3121
Parents: 0665c5f
Author: Mingliang Liu <li...@apache.org>
Authored: Tue Dec 27 11:21:10 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Tue Dec 27 11:21:10 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/server/common/HdfsServerConstants.java    |  1 +
 .../org/apache/hadoop/hdfs/server/namenode/NameNode.java  |  5 ++++-
 .../hadoop/hdfs/server/namenode/ha/BootstrapStandby.java  | 10 ++++++++--
 .../hadoop-hdfs/src/site/markdown/HDFSCommands.md         |  4 ++--
 4 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index d112a48..c9a46f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -156,6 +156,7 @@ public interface HdfsServerConstants {
     RECOVER  ("-recover"),
     FORCE("-force"),
     NONINTERACTIVE("-nonInteractive"),
+    SKIPSHAREDEDITSCHECK("-skipSharedEditsCheck"),
     RENAMERESERVED("-renameReserved"),
     METADATAVERSION("-metadataVersion"),
     UPGRADEONLY("-upgradeOnly"),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index d825177..f6c724b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -307,7 +307,10 @@ public class NameNode extends ReconfigurableBase implements
       + RollingUpgradeStartupOption.getAllOptionString() + " ] | \n\t["
       + StartupOption.IMPORT.getName() + "] | \n\t["
       + StartupOption.INITIALIZESHAREDEDITS.getName() + "] | \n\t["
-      + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | \n\t["
+      + StartupOption.BOOTSTRAPSTANDBY.getName() + " ["
+      + StartupOption.FORCE.getName() + "] ["
+      + StartupOption.NONINTERACTIVE.getName() + "] ["
+      + StartupOption.SKIPSHAREDEDITSCHECK.getName() + "] ] | \n\t["
       + StartupOption.RECOVER.getName() + " [ "
       + StartupOption.FORCE.getName() + "] ] | \n\t["
       + StartupOption.METADATAVERSION.getName() + " ]";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index 5e0f416..4d6716f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -136,8 +136,14 @@ public class BootstrapStandby implements Tool, Configurable {
   }
 
   private void printUsage() {
-    System.err.println("Usage: " + this.getClass().getSimpleName() +
-        " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
+    System.out.println("Usage: " + this.getClass().getSimpleName() +
+        " [-force] [-nonInteractive] [-skipSharedEditsCheck]\n"
+        + "\t-force: formats if the name directory exists.\n"
+        + "\t-nonInteractive: formats aborts if the name directory exists,\n"
+        + "\tunless -force option is specified.\n"
+        + "\t-skipSharedEditsCheck: skips edits check which ensures that\n"
+        + "\twe have enough edits already in the shared directory to start\n"
+        + "\tup from the last checkpoint on the active.");
   }
 
   private NamenodeProtocol createNNProtocolProxy(InetSocketAddress otherIpcAddr)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index a0d0ed7..7b00e9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -535,7 +535,7 @@ Usage:
               [-rollingUpgrade <rollback |started> ] |
               [-importCheckpoint] |
               [-initializeSharedEdits] |
-              [-bootstrapStandby] |
+              [-bootstrapStandby [-force] [-nonInteractive] [-skipSharedEditsCheck] ] |
               [-recover [-force] ] |
               [-metadataVersion ]
 
@@ -550,7 +550,7 @@ Usage:
 | `-rollingUpgrade` \<rollback\|started\> | See [Rolling Upgrade document](./HdfsRollingUpgrade.html#NameNode_Startup_Options) for the detail. |
 | `-importCheckpoint` | Loads image from a checkpoint directory and save it into the current one. Checkpoint dir is read from property dfs.namenode.checkpoint.dir |
 | `-initializeSharedEdits` | Format a new shared edits dir and copy in enough edit log segments so that the standby NameNode can start up. |
-| `-bootstrapStandby` | Allows the standby NameNode's storage directories to be bootstrapped by copying the latest namespace snapshot from the active NameNode. This is used when first configuring an HA cluster. |
+| `-bootstrapStandby` `[-force]` `[-nonInteractive]` `[-skipSharedEditsCheck]` | Allows the standby NameNode's storage directories to be bootstrapped by copying the latest namespace snapshot from the active NameNode. This is used when first configuring an HA cluster. The option -force or -nonInteractive has the same meaning as that described in namenode -format command. -skipSharedEditsCheck option skips edits check which ensures that we have enough edits already in the shared directory to start up from the last checkpoint on the active. |
 | `-recover` `[-force]` | Recover lost metadata on a corrupt filesystem. See [HDFS User Guide](./HdfsUserGuide.html#Recovery_Mode) for the detail. |
 | `-metadataVersion` | Verify that configured directories exist, then print the metadata versions of the software and the image. |
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[06/14] hadoop git commit: HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.

Posted by xg...@apache.org.
HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e902965
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e902965
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e902965

Branch: refs/heads/YARN-5734
Commit: 4e9029653dfa7a803d73c173cb7044f7e0dc1eb1
Parents: e92a770
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Dec 22 23:46:58 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Dec 22 23:46:58 2016 -0800

----------------------------------------------------------------------
 .../hadoop/metrics2/MetricsJsonBuilder.java     | 125 +++++++++
 .../lib/MutableRatesWithAggregation.java        |  40 ++-
 .../hadoop/metrics2/lib/RollingAverages.java    | 251 +++++++++++++++++++
 .../metrics2/lib/TestRollingAverages.java       | 123 +++++++++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  11 +
 .../hdfs/server/datanode/BlockReceiver.java     |  48 +++-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  14 +-
 .../hdfs/server/datanode/DataNodeMXBean.java    |  12 +
 .../hdfs/server/datanode/DataXceiver.java       |   9 +
 .../datanode/metrics/DataNodePeerMetrics.java   | 117 +++++++++
 .../src/main/resources/hdfs-default.xml         |  25 ++
 .../datanode/TestDataNodePeerMetrics.java       |  92 +++++++
 12 files changed, 850 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
new file mode 100644
index 0000000..8e42909
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Build a JSON dump of the metrics.
+ *
+ * The {@link #toString()} operator dumps out all values collected.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MetricsJsonBuilder extends MetricsRecordBuilder {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(MetricsRecordBuilder.class);
+  private final MetricsCollector parent;
+  private Map<String, Object> innerMetrics = new LinkedHashMap<>();
+
+  /**
+   * Build an instance.
+   * @param parent parent collector. Unused in this instance; only used for
+   * the {@link #parent()} method
+   */
+  public MetricsJsonBuilder(MetricsCollector parent) {
+    this.parent = parent;
+  }
+
+  private MetricsRecordBuilder tuple(String key, Object value) {
+    innerMetrics.put(key, value);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder add(MetricsTag tag) {
+    return tuple(tag.name(), tag.value());
+  }
+
+  @Override
+  public MetricsRecordBuilder add(AbstractMetric metric) {
+    return tuple(metric.info().name(), metric.toString());
+  }
+
+  @Override
+  public MetricsRecordBuilder setContext(String value) {
+    return tuple("context", value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsCollector parent() {
+    return parent;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return new ObjectMapper().writeValueAsString(innerMetrics);
+    } catch (IOException e) {
+      LOG.warn("Failed to dump to Json.", e);
+      return ExceptionUtils.getStackTrace(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
index 64eae03..9827ca7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.metrics2.lib;
 import com.google.common.collect.Sets;
 import java.lang.ref.WeakReference;
 import java.lang.reflect.Method;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -50,7 +49,8 @@ import org.apache.hadoop.metrics2.util.SampleStat;
 @InterfaceStability.Evolving
 public class MutableRatesWithAggregation extends MutableMetric {
   static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
-  private final Map<String, MutableRate> globalMetrics = new HashMap<>();
+  private final Map<String, MutableRate> globalMetrics =
+      new ConcurrentHashMap<>();
   private final Set<Class<?>> protocolCache = Sets.newHashSet();
 
   private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>>
@@ -107,12 +107,7 @@ public class MutableRatesWithAggregation extends MutableMetric {
         // Thread has died; clean up its state
         iter.remove();
       } else {
-        // Aggregate the thread's local samples into the global metrics
-        for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
-          String name = entry.getKey();
-          MutableRate globalMetric = addMetricIfNotExists(name);
-          entry.getValue().snapshotInto(globalMetric);
-        }
+        aggregateLocalStatesToGlobalMetrics(map);
       }
     }
     for (MutableRate globalMetric : globalMetrics.values()) {
@@ -120,6 +115,35 @@ public class MutableRatesWithAggregation extends MutableMetric {
     }
   }
 
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  synchronized void collectThreadLocalStates() {
+    final ConcurrentMap<String, ThreadSafeSampleStat> localStats =
+        threadLocalMetricsMap.get();
+    if (localStats != null) {
+      aggregateLocalStatesToGlobalMetrics(localStats);
+    }
+  }
+
+  /**
+   * Aggregates the thread's local samples into the global metrics. The caller
+   * should ensure its thread safety.
+   */
+  private void aggregateLocalStatesToGlobalMetrics(
+      final ConcurrentMap<String, ThreadSafeSampleStat> localStats) {
+    for (Map.Entry<String, ThreadSafeSampleStat> entry : localStats
+        .entrySet()) {
+      String name = entry.getKey();
+      MutableRate globalMetric = addMetricIfNotExists(name);
+      entry.getValue().snapshotInto(globalMetric);
+    }
+  }
+
+  Map<String, MutableRate> getGlobalMetrics() {
+    return globalMetrics;
+  }
+
   private synchronized MutableRate addMetricIfNotExists(String name) {
     MutableRate metric = globalMetrics.get(name);
     if (metric == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
new file mode 100644
index 0000000..06ae30d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+/**
+ * <p>
+ * This class maintains a group of rolling average metrics. It implements the
+ * algorithm of rolling average, i.e. a number of sliding windows are kept to
+ * roll over and evict old subsets of samples. Each window has a subset of
+ * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
+ * and sub-totals in all windows will be aggregated to final-sum and final-total
+ * used to compute final average, which is called rolling average.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RollingAverages extends MutableMetric implements Closeable {
+
+  private final MutableRatesWithAggregation innerMetrics =
+      new MutableRatesWithAggregation();
+
+  private static final ScheduledExecutorService SCHEDULER = Executors
+      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("RollingAverages-%d").build());
+
+  private ScheduledFuture<?> scheduledTask = null;
+  private Map<String, MutableRate> currentSnapshot;
+  private final int numWindows;
+  private final String avgInfoNameTemplate;
+  private final String avgInfoDescTemplate;
+
+  private static class SumAndCount {
+    private final double sum;
+    private final long count;
+
+    public SumAndCount(final double sum, final long count) {
+      this.sum = sum;
+      this.count = count;
+    }
+
+    public double getSum() {
+      return sum;
+    }
+
+    public long getCount() {
+      return count;
+    }
+  }
+
+  /**
+   * <p>
+   * key: metric name
+   * </p>
+   * <p>
+   * value: deque where sub-sums and sub-totals for sliding windows are
+   * maintained.
+   * </p>
+   */
+  private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Constructor of {@link RollingAverages}.
+   * @param windowSize
+   *          The number of seconds of each window for which sub set of samples
+   *          are gathered to compute the rolling average, A.K.A. roll over
+   *          interval.
+   * @param numWindows
+   *          The number of windows maintained to compute the rolling average.
+   * @param valueName
+   *          of the metric (e.g. "Time", "Latency")
+   */
+  public RollingAverages(
+      final int windowSize,
+      final int numWindows,
+      final String valueName) {
+    String uvName = StringUtils.capitalize(valueName);
+    String lvName = StringUtils.uncapitalize(valueName);
+    avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
+    avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
+    this.numWindows = numWindows;
+    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
+        windowSize, windowSize, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Constructor of {@link RollingAverages}.
+   * @param windowSize
+   *          The number of seconds of each window for which sub set of samples
+   *          are gathered to compute rolling average, also A.K.A roll over
+   *          interval.
+   * @param numWindows
+   *          The number of windows maintained in the same time to compute the
+   *          average of the rolling averages.
+   */
+  public RollingAverages(
+      final int windowSize,
+      final int numWindows) {
+    this(windowSize, numWindows, "Time");
+  }
+
+  @Override
+  public void snapshot(MetricsRecordBuilder builder, boolean all) {
+    if (all || changed()) {
+      for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+          : averages.entrySet()) {
+        final String name = entry.getKey();
+        final MetricsInfo avgInfo = info(
+            String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
+            String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
+        double totalSum = 0;
+        long totalCount = 0;
+
+        for (final SumAndCount sumAndCount : entry.getValue()) {
+          totalCount += sumAndCount.getCount();
+          totalSum += sumAndCount.getSum();
+        }
+
+        if (totalCount != 0) {
+          builder.addGauge(avgInfo, totalSum / totalCount);
+        }
+      }
+      if (changed()) {
+        clearChanged();
+      }
+    }
+  }
+
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  public void collectThreadLocalStates() {
+    innerMetrics.collectThreadLocalStates();
+  }
+
+  /**
+   * @param name
+   *          name of metric
+   * @param value
+   *          value of metric
+   */
+  public void add(final String name, final long value) {
+    innerMetrics.add(name, value);
+  }
+
+  private static class RatesRoller implements Runnable {
+    private final RollingAverages parent;
+
+    public RatesRoller(final RollingAverages parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void run() {
+      synchronized (parent) {
+        final MetricsCollectorImpl mc = new MetricsCollectorImpl();
+        final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
+        /**
+         * snapshot all metrics regardless of being changed or not, in case no
+         * ops since last snapshot, we will get 0.
+         */
+        parent.innerMetrics.snapshot(rb, true);
+        Preconditions.checkState(mc.getRecords().size() == 1,
+            "There must be only one record and it's named with 'RatesRoller'");
+
+        parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
+        parent.rollOverAvgs();
+      }
+      parent.setChanged();
+    }
+  }
+
+  /**
+   * Iterates over snapshot to capture all Avg metrics into rolling structure
+   * {@link RollingAverages#averages}.
+   */
+  private void rollOverAvgs() {
+    if (currentSnapshot == null) {
+      return;
+    }
+
+    for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
+      final MutableRate rate = entry.getValue();
+      final LinkedBlockingDeque<SumAndCount> deque = averages.computeIfAbsent(
+          entry.getKey(),
+          new Function<String, LinkedBlockingDeque<SumAndCount>>() {
+            @Override
+            public LinkedBlockingDeque<SumAndCount> apply(String k) {
+              return new LinkedBlockingDeque<SumAndCount>(numWindows);
+            }
+          });
+      final SumAndCount sumAndCount = new SumAndCount(
+          rate.lastStat().total(),
+          rate.lastStat().numSamples());
+      /* put newest sum and count to the end */
+      if (!deque.offerLast(sumAndCount)) {
+        deque.pollFirst();
+        deque.offerLast(sumAndCount);
+      }
+    }
+
+    setChanged();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (scheduledTask != null) {
+      scheduledTask.cancel(false);
+    }
+    scheduledTask = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
new file mode 100644
index 0000000..899d98c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.lib;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Matchers.eq;
+
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+/**
+ * This class tests various cases of the algorithms implemented in
+ * {@link RollingAverages}.
+ */
+public class TestRollingAverages {
+  /**
+   * Tests if the results are correct if no samples are inserted, dry run of
+   * empty roll over.
+   */
+  @Test(timeout = 30000)
+  public void testRollingAveragesEmptyRollover() throws Exception {
+    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    /* 5s interval and 2 windows */
+    try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+      /* Check it initially */
+      rollingAverages.snapshot(rb, true);
+      verify(rb, never()).addGauge(
+          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+      verify(rb, never()).addGauge(
+          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+
+      /* sleep 6s longer than 5s interval to wait for rollover done */
+      Thread.sleep(6000);
+      rollingAverages.snapshot(rb, false);
+      verify(rb, never()).addGauge(
+          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+      verify(rb, never()).addGauge(
+          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+    }
+  }
+
+  /**
+   * Tests the case:
+   * <p>
+   * 5s interval and 2 sliding windows
+   * </p>
+   * <p>
+   * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
+   * 2...2] and [3, 3...3]
+   * </p>
+   */
+  @Test(timeout = 30000)
+  public void testRollingAveragesRollover() throws Exception {
+    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    final String name = "foo2";
+    final int windowSize = 5; // 5s roll over interval
+    final int numWindows = 2;
+    final int numOpsPerIteration = 1000;
+    try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+        numWindows)) {
+
+      /* Push values for three intervals */
+      final long start = Time.monotonicNow();
+      for (int i = 1; i <= 3; i++) {
+        /* insert value */
+        for (long j = 1; j <= numOpsPerIteration; j++) {
+          rollingAverages.add(name, i);
+        }
+
+        /**
+         * Sleep until 1s after the next windowSize seconds interval, to let the
+         * metrics roll over
+         */
+        final long sleep = (start + (windowSize * 1000 * i) + 1000)
+            - Time.monotonicNow();
+        Thread.sleep(sleep);
+
+        /* Verify that the window reset, check it has the values we pushed in */
+        rollingAverages.snapshot(rb, false);
+
+        /*
+         * #1 window with a series of 1 1000
+         * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
+         * #3 window, e.g. [3, 3...3]
+         */
+        final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
+            + numOpsPerIteration * i;
+        /* one empty window or all 2 windows full */
+        final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
+            : numOpsPerIteration;
+        verify(rb).addGauge(
+            info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+            rollingSum / rollingTotal);
+
+        /* Verify the metrics were added the right number of times */
+        verify(rb, times(i)).addGauge(
+            eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+            anyDouble());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 15bb0bd..50217a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -457,6 +457,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_METRICS_SESSION_ID_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
   public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
+  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
+      "dfs.metrics.rolling.average.window.size";
+  public static final int     DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
+      3600;
+  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
+      "dfs.metrics.rolling.average.window.numbers";
+  public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
+      48;
+  public static final String  DFS_DATANODE_PEER_STATS_ENABLED_KEY =
+      "dfs.datanode.peer.stats.enabled";
+  public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
   public static final String  DFS_DATANODE_HOST_NAME_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
   public static final String  DFS_NAMENODE_CHECKPOINT_DIR_KEY =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 23cd44d..b3aee11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -93,6 +93,7 @@ class BlockReceiver implements Closeable {
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
+  private String bracketedMirrorAddr;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
   private DataTransferThrottler throttler;
@@ -119,6 +120,7 @@ class BlockReceiver implements Closeable {
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
+  private boolean isPenultimateNode = false;
 
   private boolean syncOnClose;
   private long restartBudget;
@@ -575,6 +577,7 @@ class BlockReceiver implements Closeable {
         DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
             mirrorAddr,
             duration);
+        trackSendPacketToLastNodeInPipeline(duration);
         if (duration > datanodeSlowLogThresholdMs) {
           LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
               + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -822,6 +825,33 @@ class BlockReceiver implements Closeable {
     return lastPacketInBlock?-1:len;
   }
 
+  /**
+   * Only tracks the latency of sending packet to the last node in pipeline.
+   * This is a conscious design choice.
+   * <p>
+   * In the case of pipeline [dn0, dn1, dn2], 5ms latency from dn0 to dn1, 100ms
+   * from dn1 to dn2, NameNode claims dn2 is slow since it sees 100ms latency to
+   * dn2. Note that NameNode is not ware of pipeline structure in this context
+   * and only sees latency between two DataNodes.
+   * </p>
+   * <p>
+   * In another case of the same pipeline, 100ms latency from dn0 to dn1, 5ms
+   * from dn1 to dn2, NameNode will miss detecting dn1 being slow since it's not
+   * the last node. However the assumption is that in a busy enough cluster
+   * there are many other pipelines where dn1 is the last node, e.g. [dn3, dn4,
+   * dn1]. Also our tracking interval is relatively long enough (at least an
+   * hour) to improve the chances of the bad DataNodes being the last nodes in
+   * multiple pipelines.
+   * </p>
+   */
+  private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
+    if (isPenultimateNode && mirrorAddr != null) {
+      datanode.getPeerMetrics().addSendPacketDownstream(
+          bracketedMirrorAddr,
+          elapsedMs);
+    }
+  }
+
   private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
     return Arrays.copyOfRange(array, end - size, end);
   }
@@ -886,7 +916,7 @@ class BlockReceiver implements Closeable {
     ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
         .getRestartOOBStatus());
   }
-  
+
   void receiveBlock(
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode
@@ -895,14 +925,16 @@ class BlockReceiver implements Closeable {
       DatanodeInfo[] downstreams,
       boolean isReplaceBlock) throws IOException {
 
-      syncOnClose = datanode.getDnConf().syncOnClose;
-      boolean responderClosed = false;
-      mirrorOut = mirrOut;
-      mirrorAddr = mirrAddr;
-      throttler = throttlerArg;
+    syncOnClose = datanode.getDnConf().syncOnClose;
+    boolean responderClosed = false;
+    mirrorOut = mirrOut;
+    mirrorAddr = mirrAddr;
+    bracketedMirrorAddr = "[" + mirrAddr + "]";
+    isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+    throttler = throttlerArg;
 
-      this.replyOut = replyOut;
-      this.isReplaceBlock = isReplaceBlock;
+    this.replyOut = replyOut;
+    this.isReplaceBlock = isReplaceBlock;
 
     try {
       if (isClient && !isTransfer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a94c4b1..4436e58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
 import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
 import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
@@ -333,6 +334,7 @@ public class DataNode extends ReconfigurableBase
   private int infoSecurePort;
 
   DataNodeMetrics metrics;
+  private DataNodePeerMetrics peerMetrics;
   private InetSocketAddress streamingAddr;
   
   // See the note below in incrDatanodeNetworkErrors re: concurrency.
@@ -1360,6 +1362,7 @@ public class DataNode extends ReconfigurableBase
     initIpcServer();
 
     metrics = DataNodeMetrics.create(getConf(), getDisplayName());
+    peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
     ecWorker = new ErasureCodingWorker(getConf(), this);
@@ -1755,11 +1758,15 @@ public class DataNode extends ReconfigurableBase
       throw new IOException(ie.getMessage());
     }
   }
-    
+
   public DataNodeMetrics getMetrics() {
     return metrics;
   }
   
+  public DataNodePeerMetrics getPeerMetrics() {
+    return peerMetrics;
+  }
+
   /** Ensure the authentication method is kerberos */
   private void checkKerberosAuthMethod(String msg) throws IOException {
     // User invoking the call must be same as the datanode user
@@ -3437,4 +3444,9 @@ public class DataNode extends ReconfigurableBase
   void setBlockScanner(BlockScanner blockScanner) {
     this.blockScanner = blockScanner;
   }
+
+  @Override // DataNodeMXBean
+  public String getSendPacketDownstreamAvgInfo() {
+    return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 37f9635..ccc5f92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -125,4 +125,16 @@ public interface DataNodeMXBean {
    * Gets the {@link FileIoProvider} statistics.
    */
   String getFileIoProviderStatistics();
+
+  /**
+   * Gets the average info (e.g. time) of SendPacketDownstream when the DataNode
+   * acts as the penultimate (2nd to the last) node in pipeline.
+   * <p>
+   * Example Json:
+   * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+   *  "[49.236.149.246:9801]RollingAvgTime":504.463,
+   *  "[84.125.113.65:9801]RollingAvgTime":497.954}
+   * </p>
+   */
+  String getSendPacketDownstreamAvgInfo();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a35a5b4..abcaa4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -323,6 +323,7 @@ class DataXceiver extends Receiver implements Runnable {
         LOG.error(s, t);
       }
     } finally {
+      collectThreadLocalStates();
       if (LOG.isDebugEnabled()) {
         LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
             + datanode.getXceiverCount());
@@ -335,6 +336,14 @@ class DataXceiver extends Receiver implements Runnable {
     }
   }
 
+  /**
+   * In this short living thread, any local states should be collected before
+   * the thread dies away.
+   */
+  private void collectThreadLocalStates() {
+    datanode.getPeerMetrics().collectThreadLocalStates();
+  }
+
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> token,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
new file mode 100644
index 0000000..9344d1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsJsonBuilder;
+import org.apache.hadoop.metrics2.lib.RollingAverages;
+
+/**
+ * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
+ * various peer operations.
+ */
+@InterfaceAudience.Private
+public class DataNodePeerMetrics {
+
+  static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+
+  private final RollingAverages sendPacketDownstreamRollingAvgerages;
+
+  private final String name;
+  private final boolean peerStatsEnabled;
+
+  public DataNodePeerMetrics(
+      final String name,
+      final int windowSize,
+      final int numWindows,
+      final boolean peerStatsEnabled) {
+    this.name = name;
+    this.peerStatsEnabled = peerStatsEnabled;
+    sendPacketDownstreamRollingAvgerages = new RollingAverages(
+        windowSize,
+        numWindows);
+  }
+
+  public String name() {
+    return name;
+  }
+
+  /**
+   * Creates an instance of DataNodePeerMetrics, used for registration.
+   */
+  public static DataNodePeerMetrics create(Configuration conf, String dnName) {
+    final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
+        ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
+        : dnName.replace(':', '-'));
+
+    final int windowSize = conf.getInt(
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+    final int numWindows = conf.getInt(
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
+    final boolean peerStatsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+    return new DataNodePeerMetrics(
+        name,
+        windowSize,
+        numWindows,
+        peerStatsEnabled);
+  }
+
+  /**
+   * Adds invocation and elapsed time of SendPacketDownstream for peer.
+   * <p>
+   * The caller should pass in a well-formatted peerAddr. e.g.
+   * "[192.168.1.110:1010]" is good. This will be translated into a full
+   * qualified metric name, e.g. "[192.168.1.110:1010]AvgTime".
+   * </p>
+   */
+  public void addSendPacketDownstream(
+      final String peerAddr,
+      final long elapsedMs) {
+    if (peerStatsEnabled) {
+      sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
+    }
+  }
+
+  /**
+   * Dump SendPacketDownstreamRollingAvgTime metrics as JSON.
+   */
+  public String dumpSendPacketDownstreamAvgInfoAsJson() {
+    final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
+    sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
+    return builder.toString();
+  }
+
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  public void collectThreadLocalStates() {
+    sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 086f667..3389d84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1972,6 +1972,31 @@
 </property>
 
 <property>
+  <name>dfs.datanode.peer.stats.enabled</name>
+  <value>false</value>
+  <description>
+    A switch to turn on/off tracking DataNode peer statistics.
+  </description>
+</property>
+
+<property>
+  <name>dfs.metrics.rolling.average.window.size</name>
+  <value>3600</value>
+  <description>
+    The number of seconds of each window for which sub set of samples are gathered
+    to compute the rolling average, A.K.A. roll over interval.
+  </description>
+</property>
+
+<property>
+  <name>dfs.metrics.rolling.average.window.numbers</name>
+  <value>48</value>
+  <description>
+    The number of windows maintained to compute the rolling average.
+  </description>
+</property>
+
+<property>
   <name>hadoop.user.group.metrics.percentiles.intervals</name>
   <value></value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
new file mode 100644
index 0000000..5af54a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+/**
+ * This class tests various cases of DataNode peer metrics.
+ */
+public class TestDataNodePeerMetrics {
+
+  @Test(timeout = 30000)
+  public void testGetSendPacketDownstreamAvgInfo() throws Exception {
+    final int windowSize = 5; // 5s roll over interval
+    final int numWindows = 2; // 2 rolling windows
+    final int iterations = 3;
+    final int numOpsPerIteration = 1000;
+
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+        windowSize);
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+        numWindows);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
+
+    final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
+        conf,
+        "Sample-DataNode");
+    final long start = Time.monotonicNow();
+    for (int i = 1; i <= iterations; i++) {
+      final String peerAddr = genPeerAddress();
+      for (int j = 1; j <= numOpsPerIteration; j++) {
+        /* simulate to get latency of 1 to 1000 ms */
+        final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
+        peerMetrics.addSendPacketDownstream(peerAddr, latency);
+      }
+
+      /**
+       * Sleep until 1s after the next windowSize seconds interval, to let the
+       * metrics roll over
+       */
+      final long sleep = (start + (windowSize * 1000 * i) + 1000)
+          - Time.monotonicNow();
+      Thread.sleep(sleep);
+
+      /* dump avg info */
+      final String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+      /*
+       * example json:
+       * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+       *  "[49.236.149.246:9801]RollingAvgTime":504.463,
+       *  "[84.125.113.65:9801]RollingAvgTime":497.954}
+       */
+      assertThat(json, containsString(peerAddr));
+    }
+  }
+
+  /**
+   * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
+   */
+  private String genPeerAddress() {
+    final  ThreadLocalRandom r = ThreadLocalRandom.current();
+    return String.format("[%d.%d.%d.%d:9801]",
+        r.nextInt(1, 256), r.nextInt(1, 256),
+        r.nextInt(1, 256), r.nextInt(1, 256));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[04/14] hadoop git commit: MAPREDUCE-6704. Update the documents to run MapReduce application. Contributed by Bibin A Chundatt.

Posted by xg...@apache.org.
MAPREDUCE-6704. Update the documents to run MapReduce application. Contributed by Bibin A Chundatt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22befbd5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22befbd5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22befbd5

Branch: refs/heads/YARN-5734
Commit: 22befbd585f65934e1d9ae5782a8f961192c0750
Parents: 38e66d4
Author: Akira Ajisaka <aa...@apache.org>
Authored: Fri Dec 23 04:48:04 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Fri Dec 23 04:48:04 2016 +0900

----------------------------------------------------------------------
 .../hadoop-common/src/site/markdown/ClusterSetup.md     |  1 +
 .../hadoop-common/src/site/markdown/SingleCluster.md.vm | 12 ++++--------
 2 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/22befbd5/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md b/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
index 56b43e6..e2ccbf0 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
@@ -156,6 +156,7 @@ This section deals with important parameters to be specified in the given config
 | `yarn.nodemanager.remote-app-log-dir` | */logs* | HDFS directory where the application logs are moved on application completion. Need to set appropriate permissions. Only applicable if log-aggregation is enabled. |
 | `yarn.nodemanager.remote-app-log-dir-suffix` | *logs* | Suffix appended to the remote log dir. Logs will be aggregated to ${yarn.nodemanager.remote-app-log-dir}/${user}/${thisParam} Only applicable if log-aggregation is enabled. |
 | `yarn.nodemanager.aux-services` | mapreduce\_shuffle | Shuffle service that needs to be set for Map Reduce applications. |
+| `yarn.nodemanager.env-whitelist` | Environment properties to be inherited by containers from NodeManagers | For mapreduce application in addition to the default values HADOOP\_MAPRED_HOME should to be added. Property value should JAVA\_HOME,HADOOP\_COMMON\_HOME,HADOOP\_HDFS\_HOME,HADOOP\_CONF\_DIR,CLASSPATH\_PREPEND\_DISTCACHE,HADOOP\_YARN\_HOME,HADOOP\_MAPRED\_HOME |
 
   * Configurations for History Server (Needs to be moved elsewhere):
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22befbd5/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm b/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
index 4825e00..fb4df9a 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
@@ -190,14 +190,6 @@ The following instructions assume that 1. ~ 4. steps of [the above instructions]
                 <name>mapreduce.framework.name</name>
                 <value>yarn</value>
             </property>
-            <property>
-                <name>mapreduce.admin.user.env</name>
-                <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
-            </property>
-            <property>
-                <name>yarn.app.mapreduce.am.env</name>
-                <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
-            </property>
         </configuration>
 
     `etc/hadoop/yarn-site.xml`:
@@ -207,6 +199,10 @@ The following instructions assume that 1. ~ 4. steps of [the above instructions]
                 <name>yarn.nodemanager.aux-services</name>
                 <value>mapreduce_shuffle</value>
             </property>
+            <property>
+                <name>yarn.nodemanager.env-whitelist</name>
+                <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
+            </property>
         </configuration>
 
 2.  Start ResourceManager daemon and NodeManager daemon:


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[11/14] hadoop git commit: HDFS-11272. Refine the assert messages in TestFSDirAttrOp. Contributed by Jimmy Xiang.

Posted by xg...@apache.org.
HDFS-11272. Refine the assert messages in TestFSDirAttrOp. Contributed by Jimmy Xiang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea547529
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea547529
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea547529

Branch: refs/heads/YARN-5734
Commit: ea547529cb17c931a756ebffa6942f71f761ad13
Parents: 8f218ea
Author: Akira Ajisaka <aa...@apache.org>
Authored: Mon Dec 26 17:15:45 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Mon Dec 26 17:15:45 2016 +0900

----------------------------------------------------------------------
 .../hadoop/hdfs/server/namenode/TestFSDirAttrOp.java      | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea547529/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
index 8cd68a1..44cf57a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
@@ -52,25 +52,25 @@ public class TestFSDirAttrOp {
   @Test
   public void testUnprotectedSetTimes() throws Exception {
     // atime < access time + precision
-    assertFalse("SetTimes should not update access time"
+    assertFalse("SetTimes should not update access time "
           + "because it's within the last precision interval",
         unprotectedSetTimes(100, 0, 1000, -1, false));
 
     // atime = access time + precision
-    assertFalse("SetTimes should not update access time"
+    assertFalse("SetTimes should not update access time "
           + "because it's within the last precision interval",
         unprotectedSetTimes(1000, 0, 1000, -1, false));
 
     // atime > access time + precision
-    assertTrue("SetTimes should store access time",
+    assertTrue("SetTimes should update access time",
         unprotectedSetTimes(1011, 10, 1000, -1, false));
 
     // atime < access time + precision, but force is set
-    assertTrue("SetTimes should store access time",
+    assertTrue("SetTimes should update access time",
         unprotectedSetTimes(100, 0, 1000, -1, true));
 
     // atime < access time + precision, but mtime is set
-    assertTrue("SetTimes should store access time",
+    assertTrue("SetTimes should update access time",
         unprotectedSetTimes(100, 0, 1000, 1, false));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[13/14] hadoop git commit: HADOOP-13943. TestCommonConfigurationFields#testCompareXmlAgainstConfigurationClass fails after HADOOP-13863. Contributed by Brahma Reddy Battula.

Posted by xg...@apache.org.
HADOOP-13943. TestCommonConfigurationFields#testCompareXmlAgainstConfigurationClass fails after HADOOP-13863. Contributed by Brahma Reddy Battula.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0665c5f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0665c5f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0665c5f0

Branch: refs/heads/YARN-5734
Commit: 0665c5f09afb2d6f59b6c4d4980f8b1ff9cbe620
Parents: ded2d08
Author: Akira Ajisaka <aa...@apache.org>
Authored: Wed Dec 28 01:39:30 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Wed Dec 28 01:39:30 2016 +0900

----------------------------------------------------------------------
 .../org/apache/hadoop/conf/TestCommonConfigurationFields.java | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0665c5f0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 72316ad..571dfae 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -106,6 +106,13 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("fs.adl.impl");
     xmlPropsToSkipCompare.add("fs.AbstractFileSystem.adl.impl");
 
+    // Azure properties are in a different class
+    // - org.apache.hadoop.fs.azure.AzureNativeFileSystemStore
+    // - org.apache.hadoop.fs.azure.SASKeyGeneratorImpl
+    xmlPropsToSkipCompare.add("fs.azure.sas.expiry.period");
+    xmlPropsToSkipCompare.add("fs.azure.local.sas.key.mode");
+    xmlPropsToSkipCompare.add("fs.azure.secure.mode");
+
     // Deprecated properties.  These should eventually be removed from the
     // class.
     configurationPropsToSkipCompare


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[10/14] hadoop git commit: HDFS-11250. Fix a typo in ReplicaUnderRecovery#setRecoveryID. Contributed by Yiqun Lin.

Posted by xg...@apache.org.
HDFS-11250. Fix a typo in ReplicaUnderRecovery#setRecoveryID. Contributed by Yiqun Lin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f218ea2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f218ea2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f218ea2

Branch: refs/heads/YARN-5734
Commit: 8f218ea2849477bdcb4918586aaefed0cd118341
Parents: 483cd06
Author: Akira Ajisaka <aa...@apache.org>
Authored: Mon Dec 26 16:51:29 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Mon Dec 26 16:51:29 2016 +0900

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f218ea2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 09140e7..324a8ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -65,7 +65,7 @@ public class ReplicaUnderRecovery extends LocalReplica {
     if (recoveryId > this.recoveryId) {
       this.recoveryId = recoveryId;
     } else {
-      throw new IllegalArgumentException("The new rcovery id: " + recoveryId
+      throw new IllegalArgumentException("The new recovery id: " + recoveryId
           + " must be greater than the current one: " + this.recoveryId);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[09/14] hadoop git commit: HDFS-11271. Typo in NameNode UI. Contributed by Wei-Chiu Chuang.

Posted by xg...@apache.org.
HDFS-11271. Typo in NameNode UI. Contributed by Wei-Chiu Chuang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/483cd06a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/483cd06a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/483cd06a

Branch: refs/heads/YARN-5734
Commit: 483cd06ad43b85e00a782fe225f1f40657bab204
Parents: c721f78
Author: Akira Ajisaka <aa...@apache.org>
Authored: Mon Dec 26 16:36:54 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Mon Dec 26 16:36:54 2016 +0900

----------------------------------------------------------------------
 .../hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html             | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/483cd06a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
index 3598c80..d71e798 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
@@ -152,11 +152,11 @@
   {/fs}
 </p>
 {#mem.HeapMemoryUsage}
-<p>Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Heap Memory. Max Heap Memory is {@eq key=max value="-1" type="number"}&ltunbonded&gt{:else}{max|fmt_bytes}{/eq}.</p>
+<p>Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Heap Memory. Max Heap Memory is {@eq key=max value="-1" type="number"}&ltunbounded&gt{:else}{max|fmt_bytes}{/eq}.</p>
 {/mem.HeapMemoryUsage}
 
 {#mem.NonHeapMemoryUsage}
-<p>Non Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Commited Non Heap Memory. Max Non Heap Memory is {@eq key=max value="-1" type="number"}&ltunbonded&gt{:else}{max|fmt_bytes}{/eq}.</p>
+<p>Non Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Commited Non Heap Memory. Max Non Heap Memory is {@eq key=max value="-1" type="number"}&ltunbounded&gt{:else}{max|fmt_bytes}{/eq}.</p>
 {/mem.NonHeapMemoryUsage}
 
 {#nn}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[08/14] hadoop git commit: YARN-6026. A couple of spelling errors in the docs. Contributed by Grant Sohn.

Posted by xg...@apache.org.
YARN-6026. A couple of spelling errors in the docs. Contributed by Grant Sohn.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c721f78a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c721f78a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c721f78a

Branch: refs/heads/YARN-5734
Commit: c721f78a3c6f560dff21ca966443551aba7541a4
Parents: 4d3f73a
Author: Naganarasimha <na...@apache.org>
Authored: Sat Dec 24 05:38:09 2016 +0530
Committer: Naganarasimha <na...@apache.org>
Committed: Sat Dec 24 05:38:09 2016 +0530

----------------------------------------------------------------------
 .../hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md      | 2 +-
 .../hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md        | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c721f78a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
index a72d7e4..94d075b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
@@ -2286,7 +2286,7 @@ The *maximum-resource-capabilites* object contains the following elements:
 
 | Item | Data Type | Description |
 |:---- |:---- |:---- |
-| memory | int | The maxiumim memory available for a container |
+| memory | int | The maximum memory available for a container |
 | vCores | int | The maximum number of cores available for a container |
 
 ### Response Examples

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c721f78a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
index 6b7bd08..e801e14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
@@ -334,7 +334,7 @@ After creating the timeline client, user also needs to set the timeline collecto
 
     amRMClient.registerTimelineClient(timelineClient)\u037e
 
-Else address needs to be retreived from the AM allocate response and need to be set in timeline client explicitly.
+Else address needs to be retrieved from the AM allocate response and need to be set in timeline client explicitly.
 
     timelineClient.setTimelineServiceAddress(response.getCollectorAddr());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[03/14] hadoop git commit: YARN-5903. Fix race condition in TestResourceManagerAdministrationProtocolPBClientImpl beforeclass setup method (Haibo Chen via Varun Saxena)

Posted by xg...@apache.org.
YARN-5903. Fix race condition in TestResourceManagerAdministrationProtocolPBClientImpl beforeclass setup method (Haibo Chen via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/38e66d4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/38e66d4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/38e66d4d

Branch: refs/heads/YARN-5734
Commit: 38e66d4d64f3c2e2bb43d8e5dca3866d672322b6
Parents: 56a13a6
Author: Varun Saxena <va...@apache.org>
Authored: Thu Dec 22 23:08:33 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Thu Dec 22 23:08:33 2016 +0530

----------------------------------------------------------------------
 ...nagerAdministrationProtocolPBClientImpl.java | 33 ++++++++++++++------
 1 file changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/38e66d4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
index c3dd93d..fd83028 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
@@ -19,11 +19,15 @@ package org.apache.hadoop.yarn.client;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.service.ServiceStateChangeListener;
 import org.apache.hadoop.yarn.api.records.DecommissionType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -77,22 +82,30 @@ public class TestResourceManagerAdministrationProtocolPBClientImpl {
       protected void doSecureLogin() throws IOException {
       }
     };
+
+    // a reliable way to wait for resource manager to fully start
+    final CountDownLatch rmStartedSignal = new CountDownLatch(1);
+    ServiceStateChangeListener rmStateChangeListener =
+        new ServiceStateChangeListener() {
+          @Override
+          public void stateChanged(Service service) {
+            if (service.getServiceState() == STATE.STARTED) {
+              rmStartedSignal.countDown();
+            }
+          }
+        };
+    resourceManager.registerServiceListener(rmStateChangeListener);
+
     resourceManager.init(configuration);
     new Thread() {
       public void run() {
         resourceManager.start();
       }
     }.start();
-    int waitCount = 0;
-    while (resourceManager.getServiceState() == STATE.INITED
-            && waitCount++ < 10) {
-      LOG.info("Waiting for RM to start...");
-      Thread.sleep(1000);
-    }
-    if (resourceManager.getServiceState() != STATE.STARTED) {
-      throw new IOException("ResourceManager failed to start. Final state is "
-              + resourceManager.getServiceState());
-    }
+
+    boolean rmStarted = rmStartedSignal.await(60000L, TimeUnit.MILLISECONDS);
+    Assert.assertTrue("ResourceManager failed to start up.", rmStarted);
+
     LOG.info("ResourceManager RMAdmin address: "
             + configuration.get(YarnConfiguration.RM_ADMIN_ADDRESS));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[02/14] hadoop git commit: HDFS-11216. Add remoteBytesRead counter metrics for erasure coding reconstruction task. Contributed by Sammi Chen

Posted by xg...@apache.org.
HDFS-11216. Add remoteBytesRead counter metrics for erasure coding reconstruction task. Contributed by Sammi Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56a13a6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56a13a6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56a13a6a

Branch: refs/heads/YARN-5734
Commit: 56a13a6a59cb128cf6fdac78a074faf7e5603967
Parents: ae40153
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Dec 22 14:18:54 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Dec 22 14:18:54 2016 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/test/MetricsAsserts.java  |  7 +++++++
 .../erasurecode/StripedBlockReader.java         | 11 +++++++---
 .../erasurecode/StripedBlockReconstructor.java  |  1 +
 .../erasurecode/StripedReconstructor.java       | 14 +++++++++++--
 .../datanode/metrics/DataNodeMetrics.java       |  6 ++++++
 .../TestDataNodeErasureCodingMetrics.java       | 21 +++++++++++++++++++-
 6 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
index 5d87b07..a7bbe84 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
@@ -236,6 +236,13 @@ public class MetricsAsserts {
     return captor.getValue();
   }
 
+  public static long getLongCounterWithoutCheck(String name,
+    MetricsRecordBuilder rb) {
+    ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
+    verify(rb, atLeast(0)).addCounter(eqName(info(name, "")), captor.capture());
+    return captor.getValue();
+  }
+
   public static String getStringMetric(String name, MetricsRecordBuilder rb) {
     ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
     verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 0f7c5c7..556158c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -65,6 +65,7 @@ class StripedBlockReader {
   private final DatanodeInfo source;
   private BlockReader blockReader;
   private ByteBuffer buffer;
+  private boolean isLocal;
 
   StripedBlockReader(StripedReader stripedReader, DataNode datanode,
                      Configuration conf, short index, ExtendedBlock block,
@@ -76,6 +77,7 @@ class StripedBlockReader {
     this.index = index;
     this.source = source;
     this.block = block;
+    this.isLocal = false;
 
     BlockReader tmpBlockReader = createBlockReader(offsetInBlock);
     if (tmpBlockReader != null) {
@@ -116,10 +118,13 @@ class StripedBlockReader {
          *
          * TODO: add proper tracer
          */
+      Peer peer = newConnectedPeer(block, dnAddr, blockToken, source);
+      if (peer.isLocal()) {
+        this.isLocal = true;
+      }
       return BlockReaderRemote.newBlockReader(
           "dummy", block, blockToken, offsetInBlock,
-          block.getNumBytes() - offsetInBlock, true,
-          "", newConnectedPeer(block, dnAddr, blockToken, source), source,
+          block.getNumBytes() - offsetInBlock, true, "", peer, source,
           null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1);
     } catch (IOException e) {
       LOG.info("Exception while creating remote block reader, datanode {}",
@@ -187,7 +192,7 @@ class StripedBlockReader {
         break;
       }
       n += nread;
-      stripedReader.getReconstructor().incrBytesRead(nread);
+      stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index 5554d68..a1da536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -70,6 +70,7 @@ class StripedBlockReconstructor extends StripedReconstructor
       final DataNodeMetrics metrics = getDatanode().getMetrics();
       metrics.incrECReconstructionTasks();
       metrics.incrECReconstructionBytesRead(getBytesRead());
+      metrics.incrECReconstructionRemoteBytesRead(getRemoteBytesRead());
       metrics.incrECReconstructionBytesWritten(getBytesWritten());
       getStripedReader().close();
       stripedWriter.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 68769f7..cd17864 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -118,6 +118,7 @@ abstract class StripedReconstructor {
   // metrics
   private AtomicLong bytesRead = new AtomicLong(0);
   private AtomicLong bytesWritten = new AtomicLong(0);
+  private AtomicLong remoteBytesRead = new AtomicLong(0);
 
   StripedReconstructor(ErasureCodingWorker worker,
       StripedReconstructionInfo stripedReconInfo) {
@@ -138,8 +139,13 @@ abstract class StripedReconstructor {
     positionInBlock = 0L;
   }
 
-  public void incrBytesRead(long delta) {
-    bytesRead.addAndGet(delta);
+  public void incrBytesRead(boolean local, long delta) {
+    if (local) {
+      bytesRead.addAndGet(delta);
+    } else {
+      bytesRead.addAndGet(delta);
+      remoteBytesRead.addAndGet(delta);
+    }
   }
 
   public void incrBytesWritten(long delta) {
@@ -150,6 +156,10 @@ abstract class StripedReconstructor {
     return bytesRead.get();
   }
 
+  public long getRemoteBytesRead() {
+    return remoteBytesRead.get();
+  }
+
   public long getBytesWritten() {
     return bytesWritten.get();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index e09a85f..0d82fed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -140,6 +140,8 @@ public class DataNodeMetrics {
   MutableCounterLong ecReconstructionBytesRead;
   @Metric("Bytes written by erasure coding worker")
   MutableCounterLong ecReconstructionBytesWritten;
+  @Metric("Bytes remote read by erasure coding worker")
+  MutableCounterLong ecReconstructionRemoteBytesRead;
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
@@ -459,6 +461,10 @@ public class DataNodeMetrics {
     ecReconstructionBytesRead.incr(bytes);
   }
 
+  public void incrECReconstructionRemoteBytesRead(long bytes) {
+    ecReconstructionRemoteBytesRead.incr(bytes);
+  }
+
   public void incrECReconstructionBytesWritten(long bytes) {
     ecReconstructionBytesWritten.incr(bytes);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
index 64ddbd7..7e64214 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounterWithoutCheck;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -98,6 +99,8 @@ public class TestDataNodeErasureCodingMetrics {
         blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
     Assert.assertEquals("EcReconstructionBytesWritten should be ",
         blockSize, getLongMetric("EcReconstructionBytesWritten"));
+    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
   }
 
   // A partial block, reconstruct the partial block
@@ -110,6 +113,8 @@ public class TestDataNodeErasureCodingMetrics {
         fileLen,  getLongMetric("EcReconstructionBytesRead"));
     Assert.assertEquals("EcReconstructionBytesWritten should be ",
         fileLen, getLongMetric("EcReconstructionBytesWritten"));
+    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
   }
 
   // 1 full block + 5 partial block, reconstruct the full block
@@ -121,8 +126,10 @@ public class TestDataNodeErasureCodingMetrics {
     Assert.assertEquals("ecReconstructionBytesRead should be ",
         cellSize * dataBlocks + cellSize + cellSize / 10,
         getLongMetric("EcReconstructionBytesRead"));
-    Assert.assertEquals("ecReconstructionBytesWritten should be ",
+    Assert.assertEquals("EcReconstructionBytesWritten should be ",
         blockSize, getLongMetric("EcReconstructionBytesWritten"));
+    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
   }
 
   // 1 full block + 5 partial block, reconstruct the partial block
@@ -137,6 +144,8 @@ public class TestDataNodeErasureCodingMetrics {
     Assert.assertEquals("ecReconstructionBytesWritten should be ",
         cellSize + cellSize / 10,
         getLongMetric("EcReconstructionBytesWritten"));
+    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
   }
 
   private long getLongMetric(String metricName) {
@@ -149,6 +158,16 @@ public class TestDataNodeErasureCodingMetrics {
     return metricValue;
   }
 
+  private long getLongMetricWithoutCheck(String metricName) {
+    long metricValue = 0;
+    // Add all reconstruction metric value from all data nodes
+    for (DataNode dn : cluster.getDataNodes()) {
+      MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+      metricValue += getLongCounterWithoutCheck(metricName, rb);
+    }
+    return metricValue;
+  }
+
   private void doTest(String fileName, int fileLen,
       int deadNodeIndex) throws Exception {
     assertTrue(fileLen > 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[12/14] hadoop git commit: HADOOP-13940. Document the missing envvars commands (Contributed by Yiqun Lin via Daniel Templeton)

Posted by xg...@apache.org.
HADOOP-13940. Document the missing envvars commands (Contributed by Yiqun Lin via Daniel Templeton)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ded2d08f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ded2d08f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ded2d08f

Branch: refs/heads/YARN-5734
Commit: ded2d08f33c25daf17dbf3e5ff0ddfcf9980c6aa
Parents: ea54752
Author: Daniel Templeton <te...@apache.org>
Authored: Tue Dec 27 07:16:37 2016 -0800
Committer: Daniel Templeton <te...@apache.org>
Committed: Tue Dec 27 07:18:08 2016 -0800

----------------------------------------------------------------------
 .../hadoop-common/src/site/markdown/CommandsManual.md          | 6 ++++++
 .../src/site/markdown/MapredCommands.md                        | 6 ++++++
 .../hadoop-yarn-site/src/site/markdown/YarnCommands.md         | 6 ++++++
 3 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded2d08f/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md b/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
index ef76810..696848b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
@@ -223,6 +223,12 @@ Usage: `hadoop CLASSNAME`
 
 Runs the class named `CLASSNAME`. The class must be part of a package.
 
+### `envvars`
+
+Usage: `hadoop envvars`
+
+Display computed Hadoop environment variables.
+
 Administration Commands
 -----------------------
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded2d08f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
index f312d31..6b7de2b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
@@ -140,6 +140,12 @@ Prints the version.
 
 Usage: `mapred version`
 
+### `envvars`
+
+Usage: `mapred envvars`
+
+Display computed Hadoop environment variables.
+
 Administration Commands
 -----------------------
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded2d08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
index 1d51b1f..56096f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
@@ -166,6 +166,12 @@ Usage: `yarn version`
 
 Prints the Hadoop version.
 
+### `envvars`
+
+Usage: `yarn envvars`
+
+Display computed Hadoop environment variables.
+
 Administration Commands
 -----------------------
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org