You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/27 19:52:09 UTC
[01/14] hadoop git commit: YARN-4994. Use MiniYARNCluster with
try-with-resources in tests. Contributed by Andras Bokor.
Repository: hadoop
Updated Branches:
refs/heads/YARN-5734 736f54b72 -> c0e0ef296
YARN-4994. Use MiniYARNCluster with try-with-resources in tests. Contributed by Andras Bokor.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae401539
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae401539
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae401539
Branch: refs/heads/YARN-5734
Commit: ae401539eaf7745ec8690f9281726fb4cdcdbe94
Parents: 736f54b
Author: Akira Ajisaka <aa...@apache.org>
Authored: Thu Dec 22 14:32:24 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Thu Dec 22 14:32:24 2016 +0900
----------------------------------------------------------------------
.../jobhistory/TestJobHistoryEventHandler.java | 10 +--
.../hadoop/tools/TestHadoopArchiveLogs.java | 12 +--
.../tools/TestHadoopArchiveLogsRunner.java | 11 +--
...stHedgingRequestRMFailoverProxyProvider.java | 57 +++++++-------
.../yarn/client/api/impl/TestAMRMProxy.java | 37 +++------
.../hadoop/yarn/client/cli/TestYarnCLI.java | 15 +---
.../hadoop/yarn/server/TestMiniYarnCluster.java | 80 +++++++++-----------
7 files changed, 85 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 54a2fad..0b33d6b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -563,11 +563,9 @@ public class TestJobHistoryEventHandler {
TestParams t = new TestParams(RunningAppContext.class, false);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- MiniYARNCluster yarnCluster = null;
long currentTime = System.currentTimeMillis();
- try {
- yarnCluster = new MiniYARNCluster(
- TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1);
+ try (MiniYARNCluster yarnCluster = new MiniYARNCluster(
+ TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1)) {
yarnCluster.init(conf);
yarnCluster.start();
Configuration confJHEH = new YarnConfiguration(conf);
@@ -720,10 +718,6 @@ public class TestJobHistoryEventHandler {
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
Assert.assertEquals(TaskType.MAP.toString(),
tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
- } finally {
- if (yarnCluster != null) {
- yarnCluster.stop();
- }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
index d2d7801..5b6062b 100644
--- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
@@ -165,13 +165,11 @@ public class TestHadoopArchiveLogs {
@Test(timeout = 30000)
public void testFilterAppsByAggregatedStatus() throws Exception {
- MiniYARNCluster yarnCluster = null;
- try {
+ try (MiniYARNCluster yarnCluster =
+ new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(),
+ 1, 1, 1, 1)) {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
- yarnCluster =
- new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
- 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
conf = yarnCluster.getConfig();
@@ -237,10 +235,6 @@ public class TestHadoopArchiveLogs {
Assert.assertTrue(hal.eligibleApplications.contains(app4));
Assert.assertTrue(hal.eligibleApplications.contains(app7));
Assert.assertTrue(hal.eligibleApplications.contains(app8));
- } finally {
- if (yarnCluster != null) {
- yarnCluster.stop();
- }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
index 098e2fd..fad9b97 100644
--- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
@@ -52,16 +52,14 @@ public class TestHadoopArchiveLogsRunner {
@Test(timeout = 50000)
public void testHadoopArchiveLogs() throws Exception {
- MiniYARNCluster yarnCluster = null;
MiniDFSCluster dfsCluster = null;
FileSystem fs = null;
- try {
+ try (MiniYARNCluster yarnCluster =
+ new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
+ 1, 2, 1, 1)) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
- yarnCluster =
- new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
- 1, 2, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
conf = yarnCluster.getConfig();
@@ -133,9 +131,6 @@ public class TestHadoopArchiveLogsRunner {
harLogs[2].getOwner());
Assert.assertEquals(0, fs.listStatus(workingDir).length);
} finally {
- if (yarnCluster != null) {
- yarnCluster.stop();
- }
if (fs != null) {
fs.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
index 30b409e..b55cad8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
@@ -35,8 +35,6 @@ public class TestHedgingRequestRMFailoverProxyProvider {
@Test
public void testHedgingRequestProxyProvider() throws Exception {
- final MiniYARNCluster cluster =
- new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
@@ -49,41 +47,44 @@ public class TestHedgingRequestRMFailoverProxyProvider {
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
2000);
- HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
- HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
- HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
- HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
- HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
- conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ try (MiniYARNCluster cluster =
+ new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1)) {
- cluster.init(conf);
- cluster.start();
+ HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
+ HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
+ HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
+ HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
+ HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
- final YarnClient client = YarnClient.createYarnClient();
- client.init(conf);
- client.start();
+ cluster.init(conf);
+ cluster.start();
- // Transition rm5 to active;
- long start = System.currentTimeMillis();
- makeRMActive(cluster, 4);
+ final YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ client.start();
- validateActiveRM(client);
+ // Transition rm5 to active;
+ long start = System.currentTimeMillis();
+ makeRMActive(cluster, 4);
- long end = System.currentTimeMillis();
- System.out.println("Client call succeeded at " + end);
- // should return the response fast
- Assert.assertTrue(end - start <= 10000);
+ validateActiveRM(client);
- // transition rm5 to standby
- cluster.getResourceManager(4).getRMContext().getRMAdminService()
- .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+ long end = System.currentTimeMillis();
+ System.out.println("Client call succeeded at " + end);
+ // should return the response fast
+ Assert.assertTrue(end - start <= 10000);
- makeRMActive(cluster, 2);
+ // transition rm5 to standby
+ cluster.getResourceManager(4).getRMContext().getRMAdminService()
+ .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER));
- validateActiveRM(client);
+ makeRMActive(cluster, 2);
- cluster.stop();
+ validateActiveRM(client);
+
+ }
}
private void validateActiveRM(YarnClient client) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
index 33f7527..9eef9a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
@@ -60,11 +60,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
*/
@Test(timeout = 120000)
public void testAMRMProxyE2E() throws Exception {
- MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
- YarnClient rmClient = null;
ApplicationMasterProtocol client;
- try {
+ try (MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E",
+ 1, 1, 1);
+ YarnClient rmClient = YarnClient.createYarnClient()) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
cluster.init(conf);
@@ -75,7 +75,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
@@ -135,11 +134,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
Thread.sleep(500);
Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState());
- } finally {
- if (rmClient != null) {
- rmClient.stop();
- }
- cluster.stop();
}
}
@@ -150,12 +144,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
*/
@Test(timeout = 120000)
public void testE2ETokenRenewal() throws Exception {
- MiniYARNCluster cluster =
- new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
- YarnClient rmClient = null;
ApplicationMasterProtocol client;
- try {
+ try (MiniYARNCluster cluster =
+ new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
+ YarnClient rmClient = YarnClient.createYarnClient()) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500);
@@ -170,7 +163,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
final Configuration yarnConf = cluster.getConfig();
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
@@ -216,11 +208,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
- } finally {
- if (rmClient != null) {
- rmClient.stop();
- }
- cluster.stop();
}
}
@@ -230,11 +217,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
*/
@Test(timeout = 120000)
public void testE2ETokenSwap() throws Exception {
- MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
- YarnClient rmClient = null;
ApplicationMasterProtocol client;
- try {
+ try (MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap",
+ 1, 1, 1);
+ YarnClient rmClient = YarnClient.createYarnClient()) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
cluster.init(conf);
@@ -242,7 +229,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
// the client will connect to the RM with the token provided by AMRMProxy
final Configuration yarnConf = cluster.getConfig();
- rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
@@ -260,11 +246,6 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
e.getMessage().startsWith("Invalid AMRMToken from appattempt_"));
}
- } finally {
- if (rmClient != null) {
- rmClient.stop();
- }
- cluster.stop();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index a677606..2e90581 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -1721,15 +1721,13 @@ public class TestYarnCLI {
+ "ProportionalCapacityPreemptionPolicy");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setBoolean(PREFIX + "root.a.a1.disable_preemption", true);
- MiniYARNCluster cluster =
- new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
- YarnClient yarnClient = null;
- try {
+ try (MiniYARNCluster cluster =
+ new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
+ YarnClient yarnClient = YarnClient.createYarnClient()) {
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
- yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
@@ -1742,13 +1740,6 @@ public class TestYarnCLI {
assertEquals(0, result);
Assert.assertTrue(sysOutStream.toString()
.contains("Preemption : disabled"));
- } finally {
- // clean-up
- if (yarnClient != null) {
- yarnClient.stop();
- }
- cluster.stop();
- cluster.close();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae401539/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
index 9226ead..ff7fafc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
public class TestMiniYarnCluster {
@@ -41,10 +42,11 @@ public class TestMiniYarnCluster {
*/
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
enableAHS = false;
- MiniYARNCluster cluster = null;
- try {
- cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
- numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
+ try (MiniYARNCluster cluster =
+ new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
+ numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
+ enableAHS)) {
+
cluster.init(conf);
cluster.start();
@@ -52,11 +54,6 @@ public class TestMiniYarnCluster {
Assert.assertNull("Timeline Service should not have been started",
cluster.getApplicationHistoryServer());
}
- finally {
- if(cluster != null) {
- cluster.stop();
- }
- }
/*
* Timeline service should start if TIMELINE_SERVICE_ENABLED == true
@@ -64,10 +61,10 @@ public class TestMiniYarnCluster {
*/
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
enableAHS = false;
- cluster = null;
- try {
- cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
- numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
+ try (MiniYARNCluster cluster =
+ new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
+ numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
+ enableAHS)) {
cluster.init(conf);
// Verify that the timeline-service starts on ephemeral ports by default
@@ -87,21 +84,16 @@ public class TestMiniYarnCluster {
Assert.assertNotNull("Timeline Service should have been started",
cluster.getApplicationHistoryServer());
}
- finally {
- if(cluster != null) {
- cluster.stop();
- }
- }
/*
* Timeline service should start if TIMELINE_SERVICE_ENABLED == false
* and enableAHS == true
*/
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
enableAHS = true;
- cluster = null;
- try {
- cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
- numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
+ try (MiniYARNCluster cluster =
+ new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
+ numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
+ enableAHS)) {
cluster.init(conf);
cluster.start();
@@ -115,15 +107,10 @@ public class TestMiniYarnCluster {
Assert.assertNotNull("Timeline Service should have been started",
cluster.getApplicationHistoryServer());
}
- finally {
- if(cluster != null) {
- cluster.stop();
- }
- }
}
@Test
- public void testMultiRMConf() {
+ public void testMultiRMConf() throws IOException {
String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
Configuration conf = new YarnConfiguration();
@@ -137,23 +124,28 @@ public class TestMiniYarnCluster {
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
- MiniYARNCluster cluster =
+ try (MiniYARNCluster cluster =
new MiniYARNCluster(TestMiniYarnCluster.class.getName(),
- 2, 0, 1, 1);
- cluster.init(conf);
- Configuration conf1 = cluster.getResourceManager(0).getConfig(),
- conf2 = cluster.getResourceManager(1).getConfig();
- Assert.assertFalse(conf1 == conf2);
- Assert.assertEquals("0.0.0.0:18032",
- conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
- Assert.assertEquals("0.0.0.0:28032",
- conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
- Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
+ 2, 0, 1, 1)) {
+ cluster.init(conf);
+ Configuration conf1 = cluster.getResourceManager(0).getConfig(),
+ conf2 = cluster.getResourceManager(1).getConfig();
+ Assert.assertFalse(conf1 == conf2);
+ Assert.assertEquals("0.0.0.0:18032",
+ conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+ RM1_NODE_ID)));
+ Assert.assertEquals("0.0.0.0:28032",
+ conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+ RM2_NODE_ID)));
+ Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
- Assert.assertEquals("0.0.0.0:18032",
- conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
- Assert.assertEquals("0.0.0.0:28032",
- conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
- Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
+ Assert.assertEquals("0.0.0.0:18032",
+ conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+ RM1_NODE_ID)));
+ Assert.assertEquals("0.0.0.0:28032",
+ conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
+ RM2_NODE_ID)));
+ Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[05/14] hadoop git commit: HADOOP-13863. Azure: Add a new SAS key
mode for WASB. Contributed by Dushyanth
Posted by xg...@apache.org.
HADOOP-13863. Azure: Add a new SAS key mode for WASB. Contributed by Dushyanth
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e92a7709
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e92a7709
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e92a7709
Branch: refs/heads/YARN-5734
Commit: e92a77099b91620cee84513cc879089907468075
Parents: 22befbd
Author: Mingliang Liu <li...@apache.org>
Authored: Thu Dec 22 17:33:25 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Thu Dec 22 20:15:18 2016 -0800
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 33 +-
.../fs/azure/AzureNativeFileSystemStore.java | 84 ++-
.../fs/azure/LocalSASKeyGeneratorImpl.java | 263 +++++++++
.../fs/azure/RemoteSASKeyGeneratorImpl.java | 296 ++++++++++
.../fs/azure/SASKeyGenerationException.java | 40 ++
.../hadoop/fs/azure/SASKeyGeneratorImpl.java | 61 ++
.../fs/azure/SASKeyGeneratorInterface.java | 64 +++
.../hadoop/fs/azure/SecureModeException.java | 40 ++
.../fs/azure/SecureStorageInterfaceImpl.java | 565 +++++++++++++++++++
.../fs/azure/WasbRemoteCallException.java | 41 ++
.../hadoop/fs/azure/WasbRemoteCallHelper.java | 93 +++
.../hadoop-azure/src/site/markdown/index.md | 48 ++
.../hadoop/fs/azure/TestContainerChecks.java | 12 +-
.../fs/azure/TestWasbUriAndConfiguration.java | 17 +-
.../src/test/resources/azure-test.xml | 15 +-
15 files changed, 1660 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 05004c1..b4a34db 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1222,6 +1222,38 @@
<description>The implementation class of the S3A AbstractFileSystem.</description>
</property>
+<!-- Azure file system properties -->
+<property>
+ <name>fs.azure.secure.mode</name>
+ <value>false</value>
+ <description>
+ Config flag to identify the mode in which fs.azure.NativeAzureFileSystem needs
+ to run under. Setting it "true" would make fs.azure.NativeAzureFileSystem use
+ SAS keys to communicate with Azure storage.
+ </description>
+</property>
+<property>
+ <name>fs.azure.local.sas.key.mode</name>
+ <value>false</value>
+ <description>
+ Works in conjuction with fs.azure.secure.mode. Setting this config to true
+ results in fs.azure.NativeAzureFileSystem using the local SAS key generation
+ where the SAS keys are generating in the same process as fs.azure.NativeAzureFileSystem.
+ If fs.azure.secure.mode flag is set to false, this flag has no effect.
+ </description>
+</property>
+<property>
+ <name>fs.azure.sas.expiry.period</name>
+ <value>90d</value>
+ <description>
+ The default value to be used for expiration period for SAS keys generated.
+ Can use the following suffix (case insensitive):
+ ms(millis), s(sec), m(min), h(hour), d(day)
+ to specify the time (such as 2s, 2m, 1h, etc.).
+ </description>
+</property>
+
+
<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
@@ -2420,5 +2452,4 @@
in audit logs.
</description>
</property>
-
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index d232a2d..07c389c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
-import org.apache.hadoop.fs.azure.StorageInterfaceImpl.CloudPageBlobWrapperImpl;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
@@ -150,6 +149,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging";
+ /**
+ * Configuration keys to identify if WASB needs to run in Secure mode. In Secure mode
+ * all interactions with Azure storage is performed using SAS uris. There are two sub modes
+ * within the Secure mode , one is remote SAS key mode where the SAS keys are generated
+ * from a remote process and local mode where SAS keys are generated within WASB.
+ */
+ @VisibleForTesting
+ public static final String KEY_USE_SECURE_MODE = "fs.azure.secure.mode";
+
+ /**
+ * By default the SAS Key mode is expected to run in Romote key mode. This flags sets it
+ * to run on the local mode.
+ */
+ public static final String KEY_USE_LOCAL_SAS_KEY_MODE = "fs.azure.local.sas.key.mode";
+
private static final String PERMISSION_METADATA_KEY = "hdi_permission";
private static final String OLD_PERMISSION_METADATA_KEY = "asv_permission";
private static final String IS_FOLDER_METADATA_KEY = "hdi_isfolder";
@@ -232,6 +246,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
/**
+ * Default values to control SAS Key mode.
+ * By default we set the values to false.
+ */
+ private static final boolean DEFAULT_USE_SECURE_MODE = false;
+ private static final boolean DEFAULT_USE_LOCAL_SAS_KEY_MODE = false;
+
+ /**
* Enable flat listing of blobs as default option. This is useful only if
* listing depth is AZURE_UNBOUNDED_DEPTH.
*/
@@ -278,6 +299,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Set if we're running against a storage emulator..
private boolean isStorageEmulator = false;
+ // Configs controlling WASB SAS key mode.
+ private boolean useSecureMode = false;
+ private boolean useLocalSasKeyMode = false;
+
+ private String delegationToken;
/**
* A test hook interface that can modify the operation context we use for
* Azure Storage operations, e.g. to inject errors.
@@ -410,16 +436,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation)
throws IllegalArgumentException, AzureException, IOException {
-
+
if (null == instrumentation) {
throw new IllegalArgumentException("Null instrumentation");
}
this.instrumentation = instrumentation;
- if (null == this.storageInteractionLayer) {
- this.storageInteractionLayer = new StorageInterfaceImpl();
- }
-
// Check that URI exists.
//
if (null == uri) {
@@ -446,6 +468,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
sessionUri = uri;
sessionConfiguration = conf;
+ useSecureMode = conf.getBoolean(KEY_USE_SECURE_MODE,
+ DEFAULT_USE_SECURE_MODE);
+ useLocalSasKeyMode = conf.getBoolean(KEY_USE_LOCAL_SAS_KEY_MODE,
+ DEFAULT_USE_LOCAL_SAS_KEY_MODE);
+
+ if (null == this.storageInteractionLayer) {
+ if (!useSecureMode) {
+ this.storageInteractionLayer = new StorageInterfaceImpl();
+ } else {
+ this.storageInteractionLayer = new SecureStorageInterfaceImpl(
+ useLocalSasKeyMode, conf, delegationToken);
+ }
+ }
+
// Start an Azure storage session.
//
createAzureStorageSession();
@@ -791,6 +827,31 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
/**
+ * Method to set up the Storage Interaction layer in Secure mode.
+ * @param accountName - Storage account provided in the initializer
+ * @param containerName - Container name provided in the initializer
+ * @param sessionUri - URI provided in the initializer
+ */
+ private void connectToAzureStorageInSecureMode(String accountName,
+ String containerName, URI sessionUri)
+ throws AzureException, StorageException, URISyntaxException {
+
+ // Assertion: storageInteractionLayer instance has to be a SecureStorageInterfaceImpl
+ if (!(this.storageInteractionLayer instanceof SecureStorageInterfaceImpl)) {
+ throw new AssertionError("connectToAzureStorageInSASKeyMode() should be called only"
+ + " for SecureStorageInterfaceImpl instances");
+ }
+
+ ((SecureStorageInterfaceImpl) this.storageInteractionLayer).
+ setStorageAccountName(accountName);
+
+ container = storageInteractionLayer.getContainerReference(containerName);
+ rootDirectory = container.getDirectoryReference("");
+
+ canCreateOrModifyContainer = true;
+ }
+
+ /**
* Connect to Azure storage using account key credentials.
*/
private void connectUsingConnectionStringCredentials(
@@ -920,6 +981,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return;
}
+ // If the securemode flag is set, WASB uses SecureStorageInterfaceImpl instance
+ // to communicate with Azure storage. In SecureStorageInterfaceImpl SAS keys
+ // are used to communicate with Azure storage, so connectToAzureStorageInSecureMode
+ // instantiates the default container using a SAS Key.
+ if (useSecureMode) {
+ connectToAzureStorageInSecureMode(accountName, containerName, sessionUri);
+ return;
+ }
+
// Check whether we have a shared access signature for that container.
String propertyValue = sessionConfiguration.get(KEY_ACCOUNT_SAS_PREFIX
+ containerName + "." + accountName);
@@ -1330,7 +1400,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*/
private OutputStream openOutputStream(final CloudBlobWrapper blob)
throws StorageException {
- if (blob instanceof CloudPageBlobWrapperImpl){
+ if (blob instanceof CloudPageBlobWrapper){
return new PageBlobOutputStream(
(CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java
new file mode 100644
index 0000000..e6f1597
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/LocalSASKeyGeneratorImpl.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPermissions;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import com.microsoft.azure.storage.SharedAccessAccountResourceType;
+import com.microsoft.azure.storage.SharedAccessAccountService;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+/***
+ * Local SAS Key Generation implementation. This class resides in
+ * the same address space as the WASB driver.
+ *
+ * This class gets typically used for testing purposes.
+ *
+ */
+
+public class LocalSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
+
+ /**
+ * Map to cache CloudStorageAccount instances.
+ */
+ private Map<String, CloudStorageAccount> storageAccountMap;
+
+ private static final int HOURS_IN_DAY = 24;
+ public LocalSASKeyGeneratorImpl(Configuration conf) {
+ super(conf);
+ storageAccountMap = new HashMap<String, CloudStorageAccount>();
+ }
+
+ /**
+ * Implementation to generate SAS Key for a container
+ */
+ @Override
+ public URI getContainerSASUri(String accountName, String container)
+ throws SASKeyGenerationException {
+
+ try {
+
+ CloudStorageAccount account =
+ getSASKeyBasedStorageAccountInstance(accountName);
+ CloudBlobClient client = account.createCloudBlobClient();
+ return client.getCredentials().transformUri(
+ client.getContainerReference(container).getUri());
+
+ } catch (StorageException stoEx) {
+ throw new SASKeyGenerationException("Encountered StorageException while"
+ + " generating SAS Key for container " + container + " inside "
+ + "storage account " + accountName, stoEx);
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException while"
+ + " generating SAS Key for container " + container + " inside storage"
+ + " account " + accountName, uriSyntaxEx);
+ }
+ }
+
+ /**
+ * Helper method that creates a CloudStorageAccount instance based on
+ * SAS key for accountName
+ *
+ * @param accountName Storage Account Name
+ * @return CloudStorageAccount instance created using SAS key for
+ * the Storage Account.
+ * @throws SASKeyGenerationException
+ */
+ private CloudStorageAccount getSASKeyBasedStorageAccountInstance(
+ String accountName) throws SASKeyGenerationException {
+
+ try {
+
+ String accountNameWithoutDomain =
+ getAccountNameWithoutDomain(accountName);
+
+ CloudStorageAccount account =
+ getStorageAccountInstance(accountNameWithoutDomain,
+ AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+ accountName, getConf()));
+
+ return new CloudStorageAccount(
+ new StorageCredentialsSharedAccessSignature(
+ account.generateSharedAccessSignature(
+ getDefaultAccountAccessPolicy())), false,
+ account.getEndpointSuffix(), accountNameWithoutDomain);
+
+ } catch (KeyProviderException keyProviderEx) {
+ throw new SASKeyGenerationException("Encountered KeyProviderException"
+ + " while retrieving Storage key from configuration for account "
+ + accountName, keyProviderEx);
+ } catch (InvalidKeyException invalidKeyEx) {
+ throw new SASKeyGenerationException("Encoutered InvalidKeyException "
+ + "while generating Account level SAS key for account" + accountName,
+ invalidKeyEx);
+ } catch(StorageException storeEx) {
+ throw new SASKeyGenerationException("Encoutered StorageException while "
+ + "generating Account level SAS key for account" + accountName,
+ storeEx);
+ } catch(URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException for"
+ + " account " + accountName, uriSyntaxEx);
+ }
+ }
+
+ /**
+ * Implementation for generation of Relative Path Blob SAS Uri.
+ */
+ @Override
+ public URI getRelativeBlobSASUri(String accountName, String container,
+ String relativePath) throws SASKeyGenerationException {
+
+ CloudBlobContainer sc = null;
+ CloudBlobClient client = null;
+ try {
+ CloudStorageAccount account =
+ getSASKeyBasedStorageAccountInstance(accountName);
+ client = account.createCloudBlobClient();
+ sc = client.getContainerReference(container);
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException "
+ + "while getting container references for container " + container
+ + " inside storage account : " + accountName, uriSyntaxEx);
+ } catch (StorageException stoEx) {
+ throw new SASKeyGenerationException("Encountered StorageException while "
+ + "getting container references for container " + container
+ + " inside storage account : " + accountName, stoEx);
+ }
+
+ CloudBlockBlob blob = null;
+ try {
+ blob = sc.getBlockBlobReference(relativePath);
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException while "
+ + "getting Block Blob references for container " + container
+ + " inside storage account : " + accountName, uriSyntaxEx);
+ } catch (StorageException stoEx) {
+ throw new SASKeyGenerationException("Encountered StorageException while "
+ + "getting Block Blob references for container " + container
+ + " inside storage account : " + accountName, stoEx);
+ }
+
+ try {
+ return client.getCredentials().transformUri(blob.getUri());
+ } catch (StorageException stoEx) {
+ throw new SASKeyGenerationException("Encountered StorageException while "
+ + "generating SAS key for Blob: " + relativePath + " inside "
+ + "container : " + container + " in Storage Account : " + accountName,
+ stoEx);
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException "
+ + "while generating SAS key for Blob: " + relativePath + " inside "
+ + "container: " + container + " in Storage Account : " + accountName,
+ uriSyntaxEx);
+ }
+ }
+
+ /**
+ * Helper method that creates CloudStorageAccount Instance using the
+ * storage account key.
+ * @param accountName Name of the storage account
+ * @param accountKey Storage Account key
+ * @return CloudStorageAccount instance for the storage account.
+ * @throws SASKeyGenerationException
+ */
+ private CloudStorageAccount getStorageAccountInstance(String accountName,
+ String accountKey) throws SASKeyGenerationException {
+
+ if (!storageAccountMap.containsKey(accountName)) {
+
+ CloudStorageAccount account = null;
+ try {
+ account =
+ new CloudStorageAccount(new StorageCredentialsAccountAndKey(
+ accountName, accountKey));
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException "
+ + "for account " + accountName, uriSyntaxEx);
+ }
+
+ storageAccountMap.put(accountName, account);
+ }
+
+ return storageAccountMap.get(accountName);
+ }
+
+ /**
+ * Helper method that returns the Storage account name without
+ * the domain name suffix.
+ * @param fullAccountName Storage account name with domain name suffix
+ * @return String
+ */
+ private String getAccountNameWithoutDomain(String fullAccountName) {
+ StringTokenizer tokenizer = new StringTokenizer(fullAccountName, ".");
+ return tokenizer.nextToken();
+ }
+
+ /**
+ * Helper method to generate Access Policy for the Storage Account SAS Key
+ * @return SharedAccessAccountPolicy
+ */
+ private SharedAccessAccountPolicy getDefaultAccountAccessPolicy() {
+
+ SharedAccessAccountPolicy ap =
+ new SharedAccessAccountPolicy();
+
+ Calendar cal = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
+ cal.setTime(new Date());
+ cal.add(Calendar.HOUR, (int) getSasKeyExpiryPeriod() * HOURS_IN_DAY);
+
+ ap.setSharedAccessExpiryTime(cal.getTime());
+ ap.setPermissions(getDefaultAccoutSASKeyPermissions());
+ ap.setResourceTypes(EnumSet.of(SharedAccessAccountResourceType.CONTAINER,
+ SharedAccessAccountResourceType.OBJECT));
+ ap.setServices(EnumSet.of(SharedAccessAccountService.BLOB));
+
+ return ap;
+ }
+
+ private EnumSet<SharedAccessAccountPermissions> getDefaultAccoutSASKeyPermissions() {
+ return EnumSet.of(SharedAccessAccountPermissions.ADD,
+ SharedAccessAccountPermissions.CREATE,
+ SharedAccessAccountPermissions.DELETE,
+ SharedAccessAccountPermissions.LIST,
+ SharedAccessAccountPermissions.READ,
+ SharedAccessAccountPermissions.UPDATE,
+ SharedAccessAccountPermissions.WRITE);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
new file mode 100644
index 0000000..404419d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
+
+/**
+ * Class implementing a RemoteSASKeyGenerator. This class
+ * uses the url passed in via the Configuration to make a
+ * rest call to generate the required SAS Key.
+ */
+public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
+ /**
+ * Configuration parameter name expected in the Configuration
+ * object to provide the url of the remote service {@value}
+ */
+ private static final String KEY_CRED_SERVICE_URL =
+ "fs.azure.cred.service.url";
+
+ /**
+ * Container SAS Key generation OP name. {@value}
+ */
+ private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
+
+ /**
+ * Relative Blob SAS Key generation OP name. {@value}
+ */
+ private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
+
+ /**
+ * Query parameter specifying the expiry period to be used for sas key
+ * {@value}
+ */
+ private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
+
+ /**
+ * Query parameter name for the storage account. {@value}
+ */
+ private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
+ "storage_account";
+
+ /**
+ * Query parameter name for the storage account container. {@value}
+ */
+ private static final String CONTAINER_QUERY_PARAM_NAME =
+ "container";
+
+ /**
+ * Query parameter name for user info {@value}
+ */
+ private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
+ "delegation_token";
+
+ /**
+ * Query parameter name for the relative path inside the storage
+ * account container. {@value}
+ */
+ private static final String RELATIVE_PATH_QUERY_PARAM_NAME =
+ "relative_path";
+
+ private String delegationToken = "";
+ private String credServiceUrl = "";
+ private WasbRemoteCallHelper remoteCallHelper = null;
+
+ public RemoteSASKeyGeneratorImpl(Configuration conf) {
+ super(conf);
+ }
+
+ public boolean initialize(Configuration conf, String delegationToken) {
+
+ LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
+ credServiceUrl = conf.get(KEY_CRED_SERVICE_URL);
+
+ if (delegationToken == null || delegationToken.isEmpty()) {
+ LOG.error("Delegation Token not provided for initialization"
+ + " of RemoteSASKeyGenerator");
+ return false;
+ }
+
+ this.delegationToken = delegationToken;
+
+ if (credServiceUrl == null || credServiceUrl.isEmpty()) {
+ LOG.error("CredService Url not found in configuration to initialize"
+ + " RemoteSASKeyGenerator");
+ return false;
+ }
+
+ remoteCallHelper = new WasbRemoteCallHelper();
+ LOG.debug("Initialization of RemoteSASKeyGenerator instance successfull");
+ return true;
+ }
+
+ @Override
+ public URI getContainerSASUri(String storageAccount, String container)
+ throws SASKeyGenerationException {
+
+ try {
+
+ LOG.debug("Generating Container SAS Key for Container {} "
+ + "inside Storage Account {} ", container, storageAccount);
+ URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
+ uriBuilder.setPath("/" + CONTAINER_SAS_OP);
+ uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
+ storageAccount);
+ uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
+ container);
+ uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ + getSasKeyExpiryPeriod());
+ uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+ this.delegationToken);
+
+ RemoteSASKeyGenerationResponse sasKeyResponse =
+ makeRemoteRequest(uriBuilder.build());
+
+ if (sasKeyResponse == null) {
+ throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+ + " object null from remote call");
+ } else if (sasKeyResponse.getResponseCode()
+ == REMOTE_CALL_SUCCESS_CODE) {
+ return new URI(sasKeyResponse.getSasKey());
+ } else {
+ throw new SASKeyGenerationException("Remote Service encountered error"
+ + " in SAS Key generation : "
+ + sasKeyResponse.getResponseMessage());
+ }
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException "
+ + "while building the HttpGetRequest to remote cred service",
+ uriSyntaxEx);
+ }
+ }
+
+ @Override
+ public URI getRelativeBlobSASUri(String storageAccount, String container,
+ String relativePath) throws SASKeyGenerationException {
+
+ try {
+
+ LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
+ + " Container {} inside Storage Account {} ",
+ relativePath, container, storageAccount);
+ URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
+ uriBuilder.setPath("/" + BLOB_SAS_OP);
+ uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
+ storageAccount);
+ uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
+ container);
+ uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
+ relativePath);
+ uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ + getSasKeyExpiryPeriod());
+ uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+ this.delegationToken);
+
+ RemoteSASKeyGenerationResponse sasKeyResponse =
+ makeRemoteRequest(uriBuilder.build());
+
+ if (sasKeyResponse == null) {
+ throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+ + " object null from remote call");
+ } else if (sasKeyResponse.getResponseCode()
+ == REMOTE_CALL_SUCCESS_CODE) {
+ return new URI(sasKeyResponse.getSasKey());
+ } else {
+ throw new SASKeyGenerationException("Remote Service encountered error"
+ + " in SAS Key generation : "
+ + sasKeyResponse.getResponseMessage());
+ }
+ } catch (URISyntaxException uriSyntaxEx) {
+ throw new SASKeyGenerationException("Encountered URISyntaxException"
+ + " while building the HttpGetRequest to " + " remote service",
+ uriSyntaxEx);
+ }
+ }
+
+ /**
+ * Helper method to make a remote request.
+ * @param uri - Uri to use for the remote request
+ * @return RemoteSASKeyGenerationResponse
+ */
+ private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri)
+ throws SASKeyGenerationException {
+
+ try {
+ String responseBody =
+ remoteCallHelper.makeRemoteGetRequest(new HttpGet(uri));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(responseBody,
+ RemoteSASKeyGenerationResponse.class);
+
+ } catch (WasbRemoteCallException remoteCallEx) {
+ throw new SASKeyGenerationException("Encountered RemoteCallException"
+ + " while retrieving SAS key from remote service", remoteCallEx);
+ } catch (JsonParseException jsonParserEx) {
+ throw new SASKeyGenerationException("Encountered JsonParseException "
+ + "while parsing the response from remote"
+ + " service into RemoteSASKeyGenerationResponse object", jsonParserEx);
+ } catch (JsonMappingException jsonMappingEx) {
+ throw new SASKeyGenerationException("Encountered JsonMappingException"
+ + " while mapping the response from remote service into "
+ + "RemoteSASKeyGenerationResponse object", jsonMappingEx);
+ } catch (IOException ioEx) {
+ throw new SASKeyGenerationException("Encountered IOException while "
+ + "accessing remote service to retrieve SAS Key", ioEx);
+ }
+ }
+}
+
+/**
+ * POJO representing the response expected from a Remote
+ * SAS Key generation service.
+ * The remote SAS Key generation service is expected to
+ * return SAS key in json format:
+ * {
+ * "responseCode" : 0 or non-zero <int>,
+ * "responseMessage" : relavant message on failure <String>,
+ * "sasKey" : Requested SAS Key <String>
+ * }
+ */
+class RemoteSASKeyGenerationResponse {
+
+ /**
+ * Response code for the call.
+ */
+ private int responseCode;
+
+ /**
+ * An intelligent message corresponding to
+ * result. Specifically in case of failure
+ * the reason for failure.
+ */
+ private String responseMessage;
+
+ /**
+ * SAS Key corresponding to the request.
+ */
+ private String sasKey;
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public void setResponseCode(int responseCode) {
+ this.responseCode = responseCode;
+ }
+
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ public void setResponseMessage(String responseMessage) {
+ this.responseMessage = responseMessage;
+ }
+
+ public String getSasKey() {
+ return sasKey;
+ }
+
+ public void setSasKey(String sasKey) {
+ this.sasKey = sasKey;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java
new file mode 100644
index 0000000..7cfafc3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGenerationException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Exception that gets thrown during generation of SAS Key.
+ *
+ */
+public class SASKeyGenerationException extends AzureException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SASKeyGenerationException(String message) {
+ super(message);
+ }
+
+ public SASKeyGenerationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SASKeyGenerationException(Throwable t) {
+ super(t);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java
new file mode 100644
index 0000000..4acd6e4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Abstract base class for the SAS Key Generator implementation
+ *
+ */
+public abstract class SASKeyGeneratorImpl implements SASKeyGeneratorInterface {
+
+ /**
+ * Configuration key to be used to specify the expiry period for SAS keys
+ * This value currently is specified in days. {@value}
+ */
+ public static final String KEY_SAS_KEY_EXPIRY_PERIOD =
+ "fs.azure.sas.expiry.period";
+
+ /**
+ * Default value for the SAS key expiry period in days. {@value}
+ */
+ public static final long DEFAUL_CONTAINER_SAS_KEY_PERIOD = 90;
+
+ private long sasKeyExpiryPeriod;
+
+ private Configuration conf;
+
+ public SASKeyGeneratorImpl(Configuration conf) {
+ this.conf = conf;
+ this.sasKeyExpiryPeriod = conf.getTimeDuration(
+ KEY_SAS_KEY_EXPIRY_PERIOD, DEFAUL_CONTAINER_SAS_KEY_PERIOD,
+ TimeUnit.DAYS);
+ }
+
+ public long getSasKeyExpiryPeriod() {
+ return sasKeyExpiryPeriod;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java
new file mode 100644
index 0000000..8d871eb
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SASKeyGeneratorInterface.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+
+/**
+ * Iterface used by AzureNativeFileSysteStore to retrieve SAS Keys for the
+ * respective azure storage entity. This interface is expected to be
+ * implemented in two modes:
+ * 1) Local Mode: In this mode SAS Keys are generated
+ * in same address space as the WASB. This will be primarily used for
+ * testing purposes.
+ * 2) Remote Mode: In this mode SAS Keys are generated in a sepearte process
+ * other than WASB and will be communicated via client.
+ */
+public interface SASKeyGeneratorInterface {
+
+ /**
+ * Interface method to retrieve SAS Key for a container within the storage
+ * account.
+ *
+ * @param accountName
+ * - Storage account name
+ * @param container
+ * - Container name within the storage account.
+ * @return SAS URI for the container.
+ * @throws SASKeyGenerationException
+ */
+ URI getContainerSASUri(String accountName, String container)
+ throws SASKeyGenerationException;
+
+ /**
+ * Interface method to retrieve SAS Key for a blob within the container of the
+ * storage account.
+ *
+ * @param accountName
+ * - Storage account name
+ * @param container
+ * - Container name within the storage account.
+ * @param relativePath
+ * - Relative path within the container
+ * @return SAS URI for the relative path blob.
+ * @throws SASKeyGenerationException
+ */
+ URI getRelativeBlobSASUri(String accountName, String container,
+ String relativePath) throws SASKeyGenerationException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java
new file mode 100644
index 0000000..5bec77d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureModeException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Exception that is thrown when any error is encountered
+ * is SAS Mode operation of WASB.
+ */
+public class SecureModeException extends AzureException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SecureModeException(String message) {
+ super(message);
+ }
+
+ public SecureModeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SecureModeException(Throwable t) {
+ super(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
new file mode 100644
index 0000000..6749a76
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.RetryPolicyFactory;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageUri;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import com.microsoft.azure.storage.blob.CopyState;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.PageRange;
+import com.microsoft.azure.storage.blob.BlockEntry;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/***
+ * An implementation of the StorageInterface for SAS Key mode.
+ *
+ */
+
+public class SecureStorageInterfaceImpl extends StorageInterface {
+
+ public static final Logger LOG = LoggerFactory.getLogger(
+ SecureStorageInterfaceImpl.class);
+ public static final String SAS_ERROR_CODE = "SAS Error";
+ private SASKeyGeneratorInterface sasKeyGenerator;
+ private String storageAccount;
+ private String delegationToken;
+
+ public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
+ Configuration conf, String delegationToken)
+ throws SecureModeException {
+
+ this.delegationToken = delegationToken;
+ if (useLocalSASKeyMode) {
+ this.sasKeyGenerator = new LocalSASKeyGeneratorImpl(conf);
+ } else {
+ RemoteSASKeyGeneratorImpl remoteSasKeyGenerator =
+ new RemoteSASKeyGeneratorImpl(conf);
+ if (!remoteSasKeyGenerator.initialize(conf, this.delegationToken)) {
+ throw new SecureModeException("Remote SAS Key mode could"
+ + " not be initialized");
+ }
+ this.sasKeyGenerator = remoteSasKeyGenerator;
+ }
+ }
+
+ @Override
+ public void setTimeoutInMs(int timeoutInMs) {
+ }
+
+ @Override
+ public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
+ }
+
+ @Override
+ public void createBlobClient(CloudStorageAccount account) {
+ String errorMsg = "createBlobClient is an invalid operation in"
+ + " SAS Key Mode";
+ LOG.error(errorMsg);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+
+ @Override
+ public void createBlobClient(URI baseUri) {
+ String errorMsg = "createBlobClient is an invalid operation in "
+ + "SAS Key Mode";
+ LOG.error(errorMsg);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+
+ @Override
+ public void createBlobClient(URI baseUri, StorageCredentials credentials) {
+ String errorMsg = "createBlobClient is an invalid operation in SAS "
+ + "Key Mode";
+ LOG.error(errorMsg);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+
+ @Override
+ public StorageCredentials getCredentials() {
+ String errorMsg = "getCredentials is an invalid operation in SAS "
+ + "Key Mode";
+ LOG.error(errorMsg);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+
+ @Override
+ public CloudBlobContainerWrapper getContainerReference(String name)
+ throws URISyntaxException, StorageException {
+
+ try {
+ return new SASCloudBlobContainerWrapperImpl(storageAccount,
+ new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
+ storageAccount, name)), sasKeyGenerator);
+ } catch (SASKeyGenerationException sasEx) {
+ String errorMsg = "Encountered SASKeyGeneration exception while "
+ + "generating SAS Key for container : " + name
+ + " inside Storage account : " + storageAccount;
+ LOG.error(errorMsg);
+ throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
+ }
+ }
+
+ public void setStorageAccountName(String storageAccount) {
+ this.storageAccount = storageAccount;
+ }
+
+ @InterfaceAudience.Private
+ static class SASCloudBlobContainerWrapperImpl
+ extends CloudBlobContainerWrapper {
+
+ private final CloudBlobContainer container;
+ private String storageAccount;
+ private SASKeyGeneratorInterface sasKeyGenerator;
+
+ public SASCloudBlobContainerWrapperImpl(String storageAccount,
+ CloudBlobContainer container, SASKeyGeneratorInterface sasKeyGenerator) {
+ this.storageAccount = storageAccount;
+ this.container = container;
+ this.sasKeyGenerator = sasKeyGenerator;
+ }
+
+ @Override
+ public String getName() {
+ return container.getName();
+ }
+
+ @Override
+ public boolean exists(OperationContext opContext) throws StorageException {
+ return container.exists(AccessCondition.generateEmptyCondition(), null,
+ opContext);
+ }
+
+ @Override
+ public void create(OperationContext opContext) throws StorageException {
+ container.create(null, opContext);
+ }
+
+ @Override
+ public HashMap<String, String> getMetadata() {
+ return container.getMetadata();
+ }
+
+ @Override
+ public void setMetadata(HashMap<String, String> metadata) {
+ container.setMetadata(metadata);
+ }
+
+ @Override
+ public void downloadAttributes(OperationContext opContext)
+ throws StorageException {
+ container.downloadAttributes(AccessCondition.generateEmptyCondition(),
+ null, opContext);
+ }
+
+ @Override
+ public void uploadMetadata(OperationContext opContext)
+ throws StorageException {
+ container.uploadMetadata(AccessCondition.generateEmptyCondition(), null,
+ opContext);
+ }
+
+ @Override
+ public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath)
+ throws URISyntaxException, StorageException {
+
+ CloudBlobDirectory dir = container.getDirectoryReference(relativePath);
+ return new SASCloudBlobDirectoryWrapperImpl(dir);
+ }
+
+ @Override
+ public CloudBlobWrapper getBlockBlobReference(String relativePath)
+ throws URISyntaxException, StorageException {
+ try {
+ return new SASCloudBlockBlobWrapperImpl(
+ new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
+ storageAccount, getName(), relativePath)));
+ } catch (SASKeyGenerationException sasEx) {
+ String errorMsg = "Encountered SASKeyGeneration exception while "
+ + "generating SAS Key for relativePath : " + relativePath
+ + " inside container : " + getName() + " Storage account : " + storageAccount;
+ LOG.error(errorMsg);
+ throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
+ }
+ }
+
+ @Override
+ public CloudBlobWrapper getPageBlobReference(String relativePath)
+ throws URISyntaxException, StorageException {
+ try {
+ return new SASCloudPageBlobWrapperImpl(
+ new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
+ storageAccount, getName(), relativePath)));
+ } catch (SASKeyGenerationException sasEx) {
+ String errorMsg = "Encountered SASKeyGeneration exception while "
+ + "generating SAS Key for relativePath : " + relativePath
+ + " inside container : " + getName()
+ + " Storage account : " + storageAccount;
+ LOG.error(errorMsg);
+ throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
+ }
+ }
+ }
+
+ //
+ // WrappingIterator
+ //
+
+ /**
+ * This iterator wraps every ListBlobItem as they come from the listBlobs()
+ * calls to their proper wrapping objects.
+ */
+ private static class SASWrappingIterator implements Iterator<ListBlobItem> {
+ private final Iterator<ListBlobItem> present;
+
+ public SASWrappingIterator(Iterator<ListBlobItem> present) {
+ this.present = present;
+ }
+
+ public static Iterable<ListBlobItem> wrap(
+ final Iterable<ListBlobItem> present) {
+ return new Iterable<ListBlobItem>() {
+ @Override
+ public Iterator<ListBlobItem> iterator() {
+ return new SASWrappingIterator(present.iterator());
+ }
+ };
+ }
+
+ @Override
+ public boolean hasNext() {
+ return present.hasNext();
+ }
+
+ @Override
+ public ListBlobItem next() {
+ ListBlobItem unwrapped = present.next();
+ if (unwrapped instanceof CloudBlobDirectory) {
+ return new SASCloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
+ } else if (unwrapped instanceof CloudBlockBlob) {
+ return new SASCloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
+ } else if (unwrapped instanceof CloudPageBlob) {
+ return new SASCloudPageBlobWrapperImpl((CloudPageBlob) unwrapped);
+ } else {
+ return unwrapped;
+ }
+ }
+
+ @Override
+ public void remove() {
+ present.remove();
+ }
+ }
+
+ //
+ // CloudBlobDirectoryWrapperImpl
+ //
+ @InterfaceAudience.Private
+ static class SASCloudBlobDirectoryWrapperImpl extends CloudBlobDirectoryWrapper {
+ private final CloudBlobDirectory directory;
+
+ public SASCloudBlobDirectoryWrapperImpl(CloudBlobDirectory directory) {
+ this.directory = directory;
+ }
+
+ @Override
+ public URI getUri() {
+ return directory.getUri();
+ }
+
+ @Override
+ public Iterable<ListBlobItem> listBlobs(String prefix,
+ boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
+ BlobRequestOptions options, OperationContext opContext)
+ throws URISyntaxException, StorageException {
+ return SASWrappingIterator.wrap(directory.listBlobs(prefix,
+ useFlatBlobListing, listingDetails, options, opContext));
+ }
+
+ @Override
+ public CloudBlobContainer getContainer() throws URISyntaxException,
+ StorageException {
+ return directory.getContainer();
+ }
+
+ @Override
+ public CloudBlobDirectory getParent() throws URISyntaxException,
+ StorageException {
+ return directory.getParent();
+ }
+
+ @Override
+ public StorageUri getStorageUri() {
+ return directory.getStorageUri();
+ }
+ }
+
+ abstract static class SASCloudBlobWrapperImpl implements CloudBlobWrapper {
+ private final CloudBlob blob;
+ @Override
+ public CloudBlob getBlob() {
+ return blob;
+ }
+
+ public URI getUri() {
+ return getBlob().getUri();
+ }
+
+ protected SASCloudBlobWrapperImpl(CloudBlob blob) {
+ this.blob = blob;
+ }
+
+ @Override
+ public HashMap<String, String> getMetadata() {
+ return getBlob().getMetadata();
+ }
+
+ @Override
+ public void delete(OperationContext opContext, SelfRenewingLease lease)
+ throws StorageException {
+ getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease),
+ null, opContext);
+ }
+
+ /**
+ * Return and access condition for this lease, or else null if
+ * there's no lease.
+ */
+ private AccessCondition getLeaseCondition(SelfRenewingLease lease) {
+ AccessCondition leaseCondition = null;
+ if (lease != null) {
+ leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID());
+ }
+ return leaseCondition;
+ }
+
+ @Override
+ public boolean exists(OperationContext opContext)
+ throws StorageException {
+ return getBlob().exists(null, null, opContext);
+ }
+
+ @Override
+ public void downloadAttributes(
+ OperationContext opContext) throws StorageException {
+ getBlob().downloadAttributes(null, null, opContext);
+ }
+
+ @Override
+ public BlobProperties getProperties() {
+ return getBlob().getProperties();
+ }
+
+ @Override
+ public void setMetadata(HashMap<String, String> metadata) {
+ getBlob().setMetadata(metadata);
+ }
+
+ @Override
+ public InputStream openInputStream(
+ BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ return getBlob().openInputStream(null, options, opContext);
+ }
+
+ public OutputStream openOutputStream(
+ BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
+ }
+
+ public void upload(InputStream sourceStream, OperationContext opContext)
+ throws StorageException, IOException {
+ getBlob().upload(sourceStream, 0, null, null, opContext);
+ }
+
+ @Override
+ public CloudBlobContainer getContainer() throws URISyntaxException,
+ StorageException {
+ return getBlob().getContainer();
+ }
+
+ @Override
+ public CloudBlobDirectory getParent() throws URISyntaxException,
+ StorageException {
+ return getBlob().getParent();
+ }
+
+ @Override
+ public void uploadMetadata(OperationContext opContext)
+ throws StorageException {
+ uploadMetadata(null, null, opContext);
+ }
+
+ @Override
+ public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException{
+ getBlob().uploadMetadata(accessConditions, options, opContext);
+ }
+
+ public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
+ throws StorageException {
+
+ // Include lease in request if lease not null.
+ getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
+ }
+
+ @Override
+ public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
+ getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
+ }
+
+ @Override
+ public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
+ getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes);
+ }
+
+ @Override
+ public StorageUri getStorageUri() {
+ return getBlob().getStorageUri();
+ }
+
+ @Override
+ public CopyState getCopyState() {
+ return getBlob().getCopyState();
+ }
+
+ @Override
+ public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
+ OperationContext opContext)
+ throws StorageException, URISyntaxException {
+ getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
+ null, null, options, opContext);
+ }
+
+ @Override
+ public void downloadRange(long offset, long length, OutputStream outStream,
+ BlobRequestOptions options, OperationContext opContext)
+ throws StorageException, IOException {
+
+ getBlob().downloadRange(offset, length, outStream, null, options, opContext);
+ }
+
+ @Override
+ public SelfRenewingLease acquireLease() throws StorageException {
+ return new SelfRenewingLease(this);
+ }
+ }
+
+ //
+ // CloudBlockBlobWrapperImpl
+ //
+
+ static class SASCloudBlockBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudBlockBlobWrapper {
+
+ public SASCloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
+ super(blob);
+ }
+
+ public OutputStream openOutputStream(
+ BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
+ }
+
+ public void upload(InputStream sourceStream, OperationContext opContext)
+ throws StorageException, IOException {
+ getBlob().upload(sourceStream, 0, null, null, opContext);
+ }
+
+ public void uploadProperties(OperationContext opContext)
+ throws StorageException {
+ getBlob().uploadProperties(null, null, opContext);
+ }
+
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
+
+ }
+
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
+ }
+ }
+
+ static class SASCloudPageBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudPageBlobWrapper {
+ public SASCloudPageBlobWrapperImpl(CloudPageBlob blob) {
+ super(blob);
+ }
+
+ public void create(final long length, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ ((CloudPageBlob) getBlob()).create(length, null, options, opContext);
+ }
+
+ public void uploadPages(final InputStream sourceStream, final long offset,
+ final long length, BlobRequestOptions options, OperationContext opContext)
+ throws StorageException, IOException {
+ ((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null,
+ options, opContext);
+ }
+
+ public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ return ((CloudPageBlob) getBlob()).downloadPageRanges(
+ null, options, opContext);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java
new file mode 100644
index 0000000..43c1b61
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Exception that gets thrown when a remote call
+ * made from WASB to external cred service fails.
+ */
+
+public class WasbRemoteCallException extends AzureException {
+
+ private static final long serialVersionUID = 1L;
+
+ public WasbRemoteCallException(String message) {
+ super(message);
+ }
+
+ public WasbRemoteCallException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public WasbRemoteCallException(Throwable t) {
+ super(t);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
new file mode 100644
index 0000000..543c899
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+/**
+ * Helper class the has constants and helper methods
+ * used in WASB when integrating with a remote http cred
+ * service. Currently, remote service will be used to generate
+ * SAS keys.
+ */
+class WasbRemoteCallHelper {
+
+ /**
+ * Return code when the remote call is successful. {@value}
+ */
+ public static final int REMOTE_CALL_SUCCESS_CODE = 0;
+
+ /**
+ * Client instance to be used for making the remote call.
+ */
+ private HttpClient client = null;
+
+ public WasbRemoteCallHelper() {
+ this.client = HttpClientBuilder.create().build();
+ }
+
+ /**
+ * Helper method to make remote HTTP Get request.
+ * @param getRequest - HttpGet request object constructed by caller.
+ * @return Http Response body returned as a string. The caller
+ * is expected to semantically understand the response.
+ * @throws WasbRemoteCallException
+ */
+ public String makeRemoteGetRequest(HttpGet getRequest)
+ throws WasbRemoteCallException {
+
+ try {
+
+ HttpResponse response = client.execute(getRequest);
+
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new WasbRemoteCallException(
+ response.getStatusLine().toString());
+ }
+
+ BufferedReader rd = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent(),
+ StandardCharsets.UTF_8));
+ StringBuilder responseBody = new StringBuilder();
+ String responseLine = "";
+ while ((responseLine = rd.readLine()) != null) {
+ responseBody.append(responseLine);
+ }
+ rd.close();
+ return responseBody.toString();
+
+ } catch (ClientProtocolException clientProtocolEx) {
+ throw new WasbRemoteCallException("Encountered ClientProtocolException"
+ + " while making remote call", clientProtocolEx);
+ } catch (IOException ioEx) {
+ throw new WasbRemoteCallException("Encountered IOException while making"
+ + " remote call", ioEx);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 5d71795..2865223 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -285,6 +285,54 @@ To enable 20 threads for Rename operation. Set configuration value to 0 or 1 to
<value>20</value>
</property>
+### <a name="WASB_SECURE_MODE" />WASB Secure mode and configuration
+
+WASB can operate in secure mode where the Storage access keys required to communicate with Azure storage does not have to
+be in the same address space as the process using WASB. In this mode all interactions with Azure storage is performed using
+SAS uris. There are two sub modes within the Secure mode, one is remote SAS key mode where the SAS keys are generated from
+a remote process and local mode where SAS keys are generated within WASB. By default the SAS Key mode is expected to run in
+Romote mode, however for testing purposes the local mode can be enabled to generate SAS keys in the same process as WASB.
+
+To enable Secure mode following property needs to be set to true.
+
+```
+ <property>
+ <name>fs.azure.secure.mode</name>
+ <value>true</value>
+ </property>
+```
+
+To enable SAS key generation locally following property needs to be set to true.
+
+```
+ <property>
+ <name>fs.azure.local.sas.key.mode</name>
+ <value>true</value>
+ </property>
+```
+To use the remote SAS key generation mode, an external REST service is expected to provided required SAS keys.
+Following property can used to provide the end point to use for remote SAS Key generation:
+
+```
+ <property>
+ <name>fs.azure.cred.service.url</name>
+ <value>{URL}</value>
+ </property>
+```
+The remote service is expected to provide support for two REST calls ```{URL}/GET_CONTAINER_SAS``` and ```{URL}/GET_RELATIVE_BLOB_SAS```, for generating
+container and relative blob sas keys. An example requests
+
+```{URL}/GET_CONTAINER_SAS?storage_account=<account_name>&container=<container>&sas_expiry=<expiry period>&delegation_token=<delegation token>```
+```{URL}/GET_CONTAINER_SAS?storage_account=<account_name>&container=<container>&relative_path=<relative path>&sas_expiry=<expiry period>&delegation_token=<delegation token>```
+
+The service is expected to return a response in JSON format:
+```
+{
+ "responseCode" : 0 or non-zero <int>,
+ "responseMessage" : relavant message on failure <String>,
+ "sasKey" : Requested SAS Key <String>
+}
+```
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
index 56ec881..f6ab94d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestContainerChecks.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
import org.junit.Test;
import com.microsoft.azure.storage.blob.BlobOutputStream;
@@ -41,7 +43,7 @@ import com.microsoft.azure.storage.blob.CloudBlockBlob;
*/
public class TestContainerChecks {
private AzureBlobStorageTestAccount testAccount;
-
+ private boolean runningInSASMode = false;
@After
public void tearDown() throws Exception {
if (testAccount != null) {
@@ -50,6 +52,12 @@ public class TestContainerChecks {
}
}
+ @Before
+ public void setMode() {
+ runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+ getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+ }
+
@Test
public void testContainerExistAfterDoesNotExist() throws Exception {
testAccount = AzureBlobStorageTestAccount.create("",
@@ -155,6 +163,8 @@ public class TestContainerChecks {
@Test
public void testContainerChecksWithSas() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
testAccount = AzureBlobStorageTestAccount.create("",
EnumSet.of(CreateOptions.UseSas));
assumeNotNull(testAccount);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
index cd9d1d4..9d2770e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbUriAndConfiguration.java
@@ -39,7 +39,6 @@ import java.io.File;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
@@ -48,6 +47,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -63,7 +64,7 @@ public class TestWasbUriAndConfiguration {
protected String accountName;
protected String accountKey;
protected static Configuration conf = null;
-
+ private boolean runningInSASMode = false;
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
@@ -77,6 +78,12 @@ public class TestWasbUriAndConfiguration {
}
}
+ @Before
+ public void setMode() {
+ runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+ getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+ }
+
private boolean validateIOStreams(Path filePath) throws IOException {
// Capture the file system from the test account.
FileSystem fs = testAccount.getFileSystem();
@@ -128,6 +135,8 @@ public class TestWasbUriAndConfiguration {
@Test
public void testConnectUsingSAS() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
// Create the test account with SAS credentials.
testAccount = AzureBlobStorageTestAccount.create("",
EnumSet.of(CreateOptions.UseSas, CreateOptions.CreateContainer));
@@ -142,6 +151,8 @@ public class TestWasbUriAndConfiguration {
@Test
public void testConnectUsingSASReadonly() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
// Create the test account with SAS credentials.
testAccount = AzureBlobStorageTestAccount.create("", EnumSet.of(
CreateOptions.UseSas, CreateOptions.CreateContainer,
@@ -318,6 +329,8 @@ public class TestWasbUriAndConfiguration {
@Test
public void testCredsFromCredentialProvider() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
String account = "testacct";
String key = "testkey";
// set up conf to have a cred provider
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e92a7709/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
index 00611fc..e898aa6 100644
--- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
+++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
@@ -17,15 +17,28 @@
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<!-- For tests against live azure, provide the following account information -->
+
<!--
<property>
<name>fs.azure.test.account.name</name>
- <value>{ACCOUNTNAME}.blob.core.windows.net</value>
+ <value>{ACCOUNTNAME}.blob.core.windows.net</value>
</property>
<property>
<name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
<value>{ACCOUNTKEY}</value>
</property>
+ <property>
+ <name>fs.azure.secure.mode</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>fs.azure.local.sas.key.mode</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>fs.azure.cred.service.url</name>
+ <value>{CRED_SERIVCE_URL}</value>
+ </property>
-->
<!-- Save the above configuration properties in a separate file named -->
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[07/14] hadoop git commit: YARN-6000. Make
AllocationFileLoaderService.Listener public. (Tao Jie via kasha)
Posted by xg...@apache.org.
YARN-6000. Make AllocationFileLoaderService.Listener public. (Tao Jie via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d3f73ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d3f73ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d3f73ac
Branch: refs/heads/YARN-5734
Commit: 4d3f73acc0a5cabc748132889dbe670bea178a3f
Parents: 4e90296
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Dec 23 11:40:56 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Dec 23 11:40:56 2016 -0800
----------------------------------------------------------------------
.../scheduler/fair/AllocationFileLoaderService.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d3f73ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 3aecbfd..cd4a19b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -639,7 +639,7 @@ public class AllocationFileLoaderService extends AbstractService {
return defaultPermissions;
}
- interface Listener {
+ public interface Listener {
void onReload(AllocationConfiguration info) throws IOException;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[14/14] hadoop git commit: HDFS-11270. Document the missing options
of NameNode bootstrap command. Contributed by Yiqun Lin
Posted by xg...@apache.org.
HDFS-11270. Document the missing options of NameNode bootstrap command. Contributed by Yiqun Lin
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0e0ef29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0e0ef29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0e0ef29
Branch: refs/heads/YARN-5734
Commit: c0e0ef29696109af9a018462059f08fd99ee3121
Parents: 0665c5f
Author: Mingliang Liu <li...@apache.org>
Authored: Tue Dec 27 11:21:10 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Tue Dec 27 11:21:10 2016 -0800
----------------------------------------------------------------------
.../hadoop/hdfs/server/common/HdfsServerConstants.java | 1 +
.../org/apache/hadoop/hdfs/server/namenode/NameNode.java | 5 ++++-
.../hadoop/hdfs/server/namenode/ha/BootstrapStandby.java | 10 ++++++++--
.../hadoop-hdfs/src/site/markdown/HDFSCommands.md | 4 ++--
4 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index d112a48..c9a46f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -156,6 +156,7 @@ public interface HdfsServerConstants {
RECOVER ("-recover"),
FORCE("-force"),
NONINTERACTIVE("-nonInteractive"),
+ SKIPSHAREDEDITSCHECK("-skipSharedEditsCheck"),
RENAMERESERVED("-renameReserved"),
METADATAVERSION("-metadataVersion"),
UPGRADEONLY("-upgradeOnly"),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index d825177..f6c724b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -307,7 +307,10 @@ public class NameNode extends ReconfigurableBase implements
+ RollingUpgradeStartupOption.getAllOptionString() + " ] | \n\t["
+ StartupOption.IMPORT.getName() + "] | \n\t["
+ StartupOption.INITIALIZESHAREDEDITS.getName() + "] | \n\t["
- + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | \n\t["
+ + StartupOption.BOOTSTRAPSTANDBY.getName() + " ["
+ + StartupOption.FORCE.getName() + "] ["
+ + StartupOption.NONINTERACTIVE.getName() + "] ["
+ + StartupOption.SKIPSHAREDEDITSCHECK.getName() + "] ] | \n\t["
+ StartupOption.RECOVER.getName() + " [ "
+ StartupOption.FORCE.getName() + "] ] | \n\t["
+ StartupOption.METADATAVERSION.getName() + " ]";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index 5e0f416..4d6716f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -136,8 +136,14 @@ public class BootstrapStandby implements Tool, Configurable {
}
private void printUsage() {
- System.err.println("Usage: " + this.getClass().getSimpleName() +
- " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
+ System.out.println("Usage: " + this.getClass().getSimpleName() +
+ " [-force] [-nonInteractive] [-skipSharedEditsCheck]\n"
+ + "\t-force: formats if the name directory exists.\n"
+ + "\t-nonInteractive: formats aborts if the name directory exists,\n"
+ + "\tunless -force option is specified.\n"
+ + "\t-skipSharedEditsCheck: skips edits check which ensures that\n"
+ + "\twe have enough edits already in the shared directory to start\n"
+ + "\tup from the last checkpoint on the active.");
}
private NamenodeProtocol createNNProtocolProxy(InetSocketAddress otherIpcAddr)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0e0ef29/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index a0d0ed7..7b00e9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -535,7 +535,7 @@ Usage:
[-rollingUpgrade <rollback |started> ] |
[-importCheckpoint] |
[-initializeSharedEdits] |
- [-bootstrapStandby] |
+ [-bootstrapStandby [-force] [-nonInteractive] [-skipSharedEditsCheck] ] |
[-recover [-force] ] |
[-metadataVersion ]
@@ -550,7 +550,7 @@ Usage:
| `-rollingUpgrade` \<rollback\|started\> | See [Rolling Upgrade document](./HdfsRollingUpgrade.html#NameNode_Startup_Options) for the detail. |
| `-importCheckpoint` | Loads image from a checkpoint directory and save it into the current one. Checkpoint dir is read from property dfs.namenode.checkpoint.dir |
| `-initializeSharedEdits` | Format a new shared edits dir and copy in enough edit log segments so that the standby NameNode can start up. |
-| `-bootstrapStandby` | Allows the standby NameNode's storage directories to be bootstrapped by copying the latest namespace snapshot from the active NameNode. This is used when first configuring an HA cluster. |
+| `-bootstrapStandby` `[-force]` `[-nonInteractive]` `[-skipSharedEditsCheck]` | Allows the standby NameNode's storage directories to be bootstrapped by copying the latest namespace snapshot from the active NameNode. This is used when first configuring an HA cluster. The option -force or -nonInteractive has the same meaning as that described in namenode -format command. -skipSharedEditsCheck option skips edits check which ensures that we have enough edits already in the shared directory to start up from the last checkpoint on the active. |
| `-recover` `[-force]` | Recover lost metadata on a corrupt filesystem. See [HDFS User Guide](./HdfsUserGuide.html#Recovery_Mode) for the detail. |
| `-metadataVersion` | Verify that configured directories exist, then print the metadata versions of the software and the image. |
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[06/14] hadoop git commit: HDFS-10917. Collect peer performance
statistics on DataNode. Contributed by Xiaobing Zhou.
Posted by xg...@apache.org.
HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e902965
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e902965
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e902965
Branch: refs/heads/YARN-5734
Commit: 4e9029653dfa7a803d73c173cb7044f7e0dc1eb1
Parents: e92a770
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Dec 22 23:46:58 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Dec 22 23:46:58 2016 -0800
----------------------------------------------------------------------
.../hadoop/metrics2/MetricsJsonBuilder.java | 125 +++++++++
.../lib/MutableRatesWithAggregation.java | 40 ++-
.../hadoop/metrics2/lib/RollingAverages.java | 251 +++++++++++++++++++
.../metrics2/lib/TestRollingAverages.java | 123 +++++++++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 +
.../hdfs/server/datanode/BlockReceiver.java | 48 +++-
.../hadoop/hdfs/server/datanode/DataNode.java | 14 +-
.../hdfs/server/datanode/DataNodeMXBean.java | 12 +
.../hdfs/server/datanode/DataXceiver.java | 9 +
.../datanode/metrics/DataNodePeerMetrics.java | 117 +++++++++
.../src/main/resources/hdfs-default.xml | 25 ++
.../datanode/TestDataNodePeerMetrics.java | 92 +++++++
12 files changed, 850 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
new file mode 100644
index 0000000..8e42909
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Build a JSON dump of the metrics.
+ *
+ * The {@link #toString()} operator dumps out all values collected.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MetricsJsonBuilder extends MetricsRecordBuilder {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(MetricsRecordBuilder.class);
+ private final MetricsCollector parent;
+ private Map<String, Object> innerMetrics = new LinkedHashMap<>();
+
+ /**
+ * Build an instance.
+ * @param parent parent collector. Unused in this instance; only used for
+ * the {@link #parent()} method
+ */
+ public MetricsJsonBuilder(MetricsCollector parent) {
+ this.parent = parent;
+ }
+
+ private MetricsRecordBuilder tuple(String key, Object value) {
+ innerMetrics.put(key, value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ return tuple(tag.name(), tag.value());
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric metric) {
+ return tuple(metric.info().name(), metric.toString());
+ }
+
+ @Override
+ public MetricsRecordBuilder setContext(String value) {
+ return tuple("context", value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return parent;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(innerMetrics);
+ } catch (IOException e) {
+ LOG.warn("Failed to dump to Json.", e);
+ return ExceptionUtils.getStackTrace(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
index 64eae03..9827ca7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.metrics2.lib;
import com.google.common.collect.Sets;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -50,7 +49,8 @@ import org.apache.hadoop.metrics2.util.SampleStat;
@InterfaceStability.Evolving
public class MutableRatesWithAggregation extends MutableMetric {
static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
- private final Map<String, MutableRate> globalMetrics = new HashMap<>();
+ private final Map<String, MutableRate> globalMetrics =
+ new ConcurrentHashMap<>();
private final Set<Class<?>> protocolCache = Sets.newHashSet();
private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>>
@@ -107,12 +107,7 @@ public class MutableRatesWithAggregation extends MutableMetric {
// Thread has died; clean up its state
iter.remove();
} else {
- // Aggregate the thread's local samples into the global metrics
- for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
- String name = entry.getKey();
- MutableRate globalMetric = addMetricIfNotExists(name);
- entry.getValue().snapshotInto(globalMetric);
- }
+ aggregateLocalStatesToGlobalMetrics(map);
}
}
for (MutableRate globalMetric : globalMetrics.values()) {
@@ -120,6 +115,35 @@ public class MutableRatesWithAggregation extends MutableMetric {
}
}
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ synchronized void collectThreadLocalStates() {
+ final ConcurrentMap<String, ThreadSafeSampleStat> localStats =
+ threadLocalMetricsMap.get();
+ if (localStats != null) {
+ aggregateLocalStatesToGlobalMetrics(localStats);
+ }
+ }
+
+ /**
+ * Aggregates the thread's local samples into the global metrics. The caller
+ * should ensure its thread safety.
+ */
+ private void aggregateLocalStatesToGlobalMetrics(
+ final ConcurrentMap<String, ThreadSafeSampleStat> localStats) {
+ for (Map.Entry<String, ThreadSafeSampleStat> entry : localStats
+ .entrySet()) {
+ String name = entry.getKey();
+ MutableRate globalMetric = addMetricIfNotExists(name);
+ entry.getValue().snapshotInto(globalMetric);
+ }
+ }
+
+ Map<String, MutableRate> getGlobalMetrics() {
+ return globalMetrics;
+ }
+
private synchronized MutableRate addMetricIfNotExists(String name) {
MutableRate metric = globalMetrics.get(name);
if (metric == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
new file mode 100644
index 0000000..06ae30d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+/**
+ * <p>
+ * This class maintains a group of rolling average metrics. It implements the
+ * algorithm of rolling average, i.e. a number of sliding windows are kept to
+ * roll over and evict old subsets of samples. Each window has a subset of
+ * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
+ * and sub-totals in all windows will be aggregated to final-sum and final-total
+ * used to compute final average, which is called rolling average.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RollingAverages extends MutableMetric implements Closeable {
+
+ private final MutableRatesWithAggregation innerMetrics =
+ new MutableRatesWithAggregation();
+
+ private static final ScheduledExecutorService SCHEDULER = Executors
+ .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("RollingAverages-%d").build());
+
+ private ScheduledFuture<?> scheduledTask = null;
+ private Map<String, MutableRate> currentSnapshot;
+ private final int numWindows;
+ private final String avgInfoNameTemplate;
+ private final String avgInfoDescTemplate;
+
+ private static class SumAndCount {
+ private final double sum;
+ private final long count;
+
+ public SumAndCount(final double sum, final long count) {
+ this.sum = sum;
+ this.count = count;
+ }
+
+ public double getSum() {
+ return sum;
+ }
+
+ public long getCount() {
+ return count;
+ }
+ }
+
+ /**
+ * <p>
+ * key: metric name
+ * </p>
+ * <p>
+ * value: deque where sub-sums and sub-totals for sliding windows are
+ * maintained.
+ * </p>
+ */
+ private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Constructor of {@link RollingAverages}.
+ * @param windowSize
+ * The number of seconds of each window for which sub set of samples
+ * are gathered to compute the rolling average, A.K.A. roll over
+ * interval.
+ * @param numWindows
+ * The number of windows maintained to compute the rolling average.
+ * @param valueName
+ * of the metric (e.g. "Time", "Latency")
+ */
+ public RollingAverages(
+ final int windowSize,
+ final int numWindows,
+ final String valueName) {
+ String uvName = StringUtils.capitalize(valueName);
+ String lvName = StringUtils.uncapitalize(valueName);
+ avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
+ avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
+ this.numWindows = numWindows;
+ scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
+ windowSize, windowSize, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Constructor of {@link RollingAverages}.
+ * @param windowSize
+ * The number of seconds of each window for which sub set of samples
+ * are gathered to compute rolling average, also A.K.A roll over
+ * interval.
+ * @param numWindows
+ * The number of windows maintained in the same time to compute the
+ * average of the rolling averages.
+ */
+ public RollingAverages(
+ final int windowSize,
+ final int numWindows) {
+ this(windowSize, numWindows, "Time");
+ }
+
+ @Override
+ public void snapshot(MetricsRecordBuilder builder, boolean all) {
+ if (all || changed()) {
+ for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+ : averages.entrySet()) {
+ final String name = entry.getKey();
+ final MetricsInfo avgInfo = info(
+ String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
+ String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
+ double totalSum = 0;
+ long totalCount = 0;
+
+ for (final SumAndCount sumAndCount : entry.getValue()) {
+ totalCount += sumAndCount.getCount();
+ totalSum += sumAndCount.getSum();
+ }
+
+ if (totalCount != 0) {
+ builder.addGauge(avgInfo, totalSum / totalCount);
+ }
+ }
+ if (changed()) {
+ clearChanged();
+ }
+ }
+ }
+
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ public void collectThreadLocalStates() {
+ innerMetrics.collectThreadLocalStates();
+ }
+
+ /**
+ * @param name
+ * name of metric
+ * @param value
+ * value of metric
+ */
+ public void add(final String name, final long value) {
+ innerMetrics.add(name, value);
+ }
+
+ private static class RatesRoller implements Runnable {
+ private final RollingAverages parent;
+
+ public RatesRoller(final RollingAverages parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void run() {
+ synchronized (parent) {
+ final MetricsCollectorImpl mc = new MetricsCollectorImpl();
+ final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
+ /**
+ * snapshot all metrics regardless of being changed or not, in case no
+ * ops since last snapshot, we will get 0.
+ */
+ parent.innerMetrics.snapshot(rb, true);
+ Preconditions.checkState(mc.getRecords().size() == 1,
+ "There must be only one record and it's named with 'RatesRoller'");
+
+ parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
+ parent.rollOverAvgs();
+ }
+ parent.setChanged();
+ }
+ }
+
+ /**
+ * Iterates over snapshot to capture all Avg metrics into rolling structure
+ * {@link RollingAverages#averages}.
+ */
+ private void rollOverAvgs() {
+ if (currentSnapshot == null) {
+ return;
+ }
+
+ for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
+ final MutableRate rate = entry.getValue();
+ final LinkedBlockingDeque<SumAndCount> deque = averages.computeIfAbsent(
+ entry.getKey(),
+ new Function<String, LinkedBlockingDeque<SumAndCount>>() {
+ @Override
+ public LinkedBlockingDeque<SumAndCount> apply(String k) {
+ return new LinkedBlockingDeque<SumAndCount>(numWindows);
+ }
+ });
+ final SumAndCount sumAndCount = new SumAndCount(
+ rate.lastStat().total(),
+ rate.lastStat().numSamples());
+ /* put newest sum and count to the end */
+ if (!deque.offerLast(sumAndCount)) {
+ deque.pollFirst();
+ deque.offerLast(sumAndCount);
+ }
+ }
+
+ setChanged();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scheduledTask != null) {
+ scheduledTask.cancel(false);
+ }
+ scheduledTask = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
new file mode 100644
index 0000000..899d98c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.lib;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Matchers.eq;
+
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+/**
+ * This class tests various cases of the algorithms implemented in
+ * {@link RollingAverages}.
+ */
+public class TestRollingAverages {
+ /**
+ * Tests if the results are correct if no samples are inserted, dry run of
+ * empty roll over.
+ */
+ @Test(timeout = 30000)
+ public void testRollingAveragesEmptyRollover() throws Exception {
+ final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+ /* 5s interval and 2 windows */
+ try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+ /* Check it initially */
+ rollingAverages.snapshot(rb, true);
+ verify(rb, never()).addGauge(
+ info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+ verify(rb, never()).addGauge(
+ info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+
+ /* sleep 6s longer than 5s interval to wait for rollover done */
+ Thread.sleep(6000);
+ rollingAverages.snapshot(rb, false);
+ verify(rb, never()).addGauge(
+ info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+ verify(rb, never()).addGauge(
+ info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+ }
+ }
+
+ /**
+ * Tests the case:
+ * <p>
+ * 5s interval and 2 sliding windows
+ * </p>
+ * <p>
+ * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
+ * 2...2] and [3, 3...3]
+ * </p>
+ */
+ @Test(timeout = 30000)
+ public void testRollingAveragesRollover() throws Exception {
+ final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+ final String name = "foo2";
+ final int windowSize = 5; // 5s roll over interval
+ final int numWindows = 2;
+ final int numOpsPerIteration = 1000;
+ try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+ numWindows)) {
+
+ /* Push values for three intervals */
+ final long start = Time.monotonicNow();
+ for (int i = 1; i <= 3; i++) {
+ /* insert value */
+ for (long j = 1; j <= numOpsPerIteration; j++) {
+ rollingAverages.add(name, i);
+ }
+
+ /**
+ * Sleep until 1s after the next windowSize seconds interval, to let the
+ * metrics roll over
+ */
+ final long sleep = (start + (windowSize * 1000 * i) + 1000)
+ - Time.monotonicNow();
+ Thread.sleep(sleep);
+
+ /* Verify that the window reset, check it has the values we pushed in */
+ rollingAverages.snapshot(rb, false);
+
+ /*
+ * #1 window with a series of 1 1000
+ * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
+ * #3 window, e.g. [3, 3...3]
+ */
+ final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
+ + numOpsPerIteration * i;
+ /* one empty window or all 2 windows full */
+ final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
+ : numOpsPerIteration;
+ verify(rb).addGauge(
+ info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+ rollingSum / rollingTotal);
+
+ /* Verify the metrics were added the right number of times */
+ verify(rb, times(i)).addGauge(
+ eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+ anyDouble());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 15bb0bd..50217a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -457,6 +457,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_SESSION_ID_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
+ public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
+ "dfs.metrics.rolling.average.window.size";
+ public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
+ 3600;
+ public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
+ "dfs.metrics.rolling.average.window.numbers";
+ public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
+ 48;
+ public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
+ "dfs.datanode.peer.stats.enabled";
+ public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
public static final String DFS_DATANODE_HOST_NAME_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 23cd44d..b3aee11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -93,6 +93,7 @@ class BlockReceiver implements Closeable {
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
+ private String bracketedMirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
@@ -119,6 +120,7 @@ class BlockReceiver implements Closeable {
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
+ private boolean isPenultimateNode = false;
private boolean syncOnClose;
private long restartBudget;
@@ -575,6 +577,7 @@ class BlockReceiver implements Closeable {
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
mirrorAddr,
duration);
+ trackSendPacketToLastNodeInPipeline(duration);
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -822,6 +825,33 @@ class BlockReceiver implements Closeable {
return lastPacketInBlock?-1:len;
}
+ /**
+ * Only tracks the latency of sending packet to the last node in pipeline.
+ * This is a conscious design choice.
+ * <p>
+ * In the case of pipeline [dn0, dn1, dn2], 5ms latency from dn0 to dn1, 100ms
+ * from dn1 to dn2, NameNode claims dn2 is slow since it sees 100ms latency to
+ * dn2. Note that NameNode is not ware of pipeline structure in this context
+ * and only sees latency between two DataNodes.
+ * </p>
+ * <p>
+ * In another case of the same pipeline, 100ms latency from dn0 to dn1, 5ms
+ * from dn1 to dn2, NameNode will miss detecting dn1 being slow since it's not
+ * the last node. However the assumption is that in a busy enough cluster
+ * there are many other pipelines where dn1 is the last node, e.g. [dn3, dn4,
+ * dn1]. Also our tracking interval is relatively long enough (at least an
+ * hour) to improve the chances of the bad DataNodes being the last nodes in
+ * multiple pipelines.
+ * </p>
+ */
+ private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
+ if (isPenultimateNode && mirrorAddr != null) {
+ datanode.getPeerMetrics().addSendPacketDownstream(
+ bracketedMirrorAddr,
+ elapsedMs);
+ }
+ }
+
private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
return Arrays.copyOfRange(array, end - size, end);
}
@@ -886,7 +916,7 @@ class BlockReceiver implements Closeable {
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
.getRestartOOBStatus());
}
-
+
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
@@ -895,14 +925,16 @@ class BlockReceiver implements Closeable {
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
- syncOnClose = datanode.getDnConf().syncOnClose;
- boolean responderClosed = false;
- mirrorOut = mirrOut;
- mirrorAddr = mirrAddr;
- throttler = throttlerArg;
+ syncOnClose = datanode.getDnConf().syncOnClose;
+ boolean responderClosed = false;
+ mirrorOut = mirrOut;
+ mirrorAddr = mirrAddr;
+ bracketedMirrorAddr = "[" + mirrAddr + "]";
+ isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+ throttler = throttlerArg;
- this.replyOut = replyOut;
- this.isReplaceBlock = isReplaceBlock;
+ this.replyOut = replyOut;
+ this.isReplaceBlock = isReplaceBlock;
try {
if (isClient && !isTransfer) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a94c4b1..4436e58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
@@ -333,6 +334,7 @@ public class DataNode extends ReconfigurableBase
private int infoSecurePort;
DataNodeMetrics metrics;
+ private DataNodePeerMetrics peerMetrics;
private InetSocketAddress streamingAddr;
// See the note below in incrDatanodeNetworkErrors re: concurrency.
@@ -1360,6 +1362,7 @@ public class DataNode extends ReconfigurableBase
initIpcServer();
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
+ peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
ecWorker = new ErasureCodingWorker(getConf(), this);
@@ -1755,11 +1758,15 @@ public class DataNode extends ReconfigurableBase
throw new IOException(ie.getMessage());
}
}
-
+
public DataNodeMetrics getMetrics() {
return metrics;
}
+ public DataNodePeerMetrics getPeerMetrics() {
+ return peerMetrics;
+ }
+
/** Ensure the authentication method is kerberos */
private void checkKerberosAuthMethod(String msg) throws IOException {
// User invoking the call must be same as the datanode user
@@ -3437,4 +3444,9 @@ public class DataNode extends ReconfigurableBase
void setBlockScanner(BlockScanner blockScanner) {
this.blockScanner = blockScanner;
}
+
+ @Override // DataNodeMXBean
+ public String getSendPacketDownstreamAvgInfo() {
+ return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 37f9635..ccc5f92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -125,4 +125,16 @@ public interface DataNodeMXBean {
* Gets the {@link FileIoProvider} statistics.
*/
String getFileIoProviderStatistics();
+
+ /**
+ * Gets the average info (e.g. time) of SendPacketDownstream when the DataNode
+ * acts as the penultimate (2nd to the last) node in pipeline.
+ * <p>
+ * Example Json:
+ * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+ * "[49.236.149.246:9801]RollingAvgTime":504.463,
+ * "[84.125.113.65:9801]RollingAvgTime":497.954}
+ * </p>
+ */
+ String getSendPacketDownstreamAvgInfo();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a35a5b4..abcaa4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -323,6 +323,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.error(s, t);
}
} finally {
+ collectThreadLocalStates();
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
@@ -335,6 +336,14 @@ class DataXceiver extends Receiver implements Runnable {
}
}
+ /**
+ * In this short living thread, any local states should be collected before
+ * the thread dies away.
+ */
+ private void collectThreadLocalStates() {
+ datanode.getPeerMetrics().collectThreadLocalStates();
+ }
+
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
new file mode 100644
index 0000000..9344d1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsJsonBuilder;
+import org.apache.hadoop.metrics2.lib.RollingAverages;
+
+/**
+ * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
+ * various peer operations.
+ */
+@InterfaceAudience.Private
+public class DataNodePeerMetrics {
+
+ static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+
+ private final RollingAverages sendPacketDownstreamRollingAvgerages;
+
+ private final String name;
+ private final boolean peerStatsEnabled;
+
+ public DataNodePeerMetrics(
+ final String name,
+ final int windowSize,
+ final int numWindows,
+ final boolean peerStatsEnabled) {
+ this.name = name;
+ this.peerStatsEnabled = peerStatsEnabled;
+ sendPacketDownstreamRollingAvgerages = new RollingAverages(
+ windowSize,
+ numWindows);
+ }
+
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Creates an instance of DataNodePeerMetrics, used for registration.
+ */
+ public static DataNodePeerMetrics create(Configuration conf, String dnName) {
+ final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
+ ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
+ : dnName.replace(':', '-'));
+
+ final int windowSize = conf.getInt(
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+ final int numWindows = conf.getInt(
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
+ final boolean peerStatsEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+ return new DataNodePeerMetrics(
+ name,
+ windowSize,
+ numWindows,
+ peerStatsEnabled);
+ }
+
+ /**
+ * Adds invocation and elapsed time of SendPacketDownstream for peer.
+ * <p>
+ * The caller should pass in a well-formatted peerAddr. e.g.
+ * "[192.168.1.110:1010]" is good. This will be translated into a full
+ * qualified metric name, e.g. "[192.168.1.110:1010]AvgTime".
+ * </p>
+ */
+ public void addSendPacketDownstream(
+ final String peerAddr,
+ final long elapsedMs) {
+ if (peerStatsEnabled) {
+ sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
+ }
+ }
+
+ /**
+ * Dump SendPacketDownstreamRollingAvgTime metrics as JSON.
+ */
+ public String dumpSendPacketDownstreamAvgInfoAsJson() {
+ final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
+ sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
+ return builder.toString();
+ }
+
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ public void collectThreadLocalStates() {
+ sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 086f667..3389d84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1972,6 +1972,31 @@
</property>
<property>
+ <name>dfs.datanode.peer.stats.enabled</name>
+ <value>false</value>
+ <description>
+ A switch to turn on/off tracking DataNode peer statistics.
+ </description>
+</property>
+
+<property>
+ <name>dfs.metrics.rolling.average.window.size</name>
+ <value>3600</value>
+ <description>
+ The number of seconds of each window for which sub set of samples are gathered
+ to compute the rolling average, A.K.A. roll over interval.
+ </description>
+</property>
+
+<property>
+ <name>dfs.metrics.rolling.average.window.numbers</name>
+ <value>48</value>
+ <description>
+ The number of windows maintained to compute the rolling average.
+ </description>
+</property>
+
+<property>
<name>hadoop.user.group.metrics.percentiles.intervals</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
new file mode 100644
index 0000000..5af54a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+/**
+ * This class tests various cases of DataNode peer metrics.
+ */
+public class TestDataNodePeerMetrics {
+
+ @Test(timeout = 30000)
+ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
+ final int windowSize = 5; // 5s roll over interval
+ final int numWindows = 2; // 2 rolling windows
+ final int iterations = 3;
+ final int numOpsPerIteration = 1000;
+
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+ windowSize);
+ conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+ numWindows);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
+
+ final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
+ conf,
+ "Sample-DataNode");
+ final long start = Time.monotonicNow();
+ for (int i = 1; i <= iterations; i++) {
+ final String peerAddr = genPeerAddress();
+ for (int j = 1; j <= numOpsPerIteration; j++) {
+ /* simulate to get latency of 1 to 1000 ms */
+ final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
+ peerMetrics.addSendPacketDownstream(peerAddr, latency);
+ }
+
+ /**
+ * Sleep until 1s after the next windowSize seconds interval, to let the
+ * metrics roll over
+ */
+ final long sleep = (start + (windowSize * 1000 * i) + 1000)
+ - Time.monotonicNow();
+ Thread.sleep(sleep);
+
+ /* dump avg info */
+ final String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+ /*
+ * example json:
+ * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+ * "[49.236.149.246:9801]RollingAvgTime":504.463,
+ * "[84.125.113.65:9801]RollingAvgTime":497.954}
+ */
+ assertThat(json, containsString(peerAddr));
+ }
+ }
+
+ /**
+ * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
+ */
+ private String genPeerAddress() {
+ final ThreadLocalRandom r = ThreadLocalRandom.current();
+ return String.format("[%d.%d.%d.%d:9801]",
+ r.nextInt(1, 256), r.nextInt(1, 256),
+ r.nextInt(1, 256), r.nextInt(1, 256));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[04/14] hadoop git commit: MAPREDUCE-6704. Update the documents to
run MapReduce application. Contributed by Bibin A Chundatt.
Posted by xg...@apache.org.
MAPREDUCE-6704. Update the documents to run MapReduce application. Contributed by Bibin A Chundatt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22befbd5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22befbd5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22befbd5
Branch: refs/heads/YARN-5734
Commit: 22befbd585f65934e1d9ae5782a8f961192c0750
Parents: 38e66d4
Author: Akira Ajisaka <aa...@apache.org>
Authored: Fri Dec 23 04:48:04 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Fri Dec 23 04:48:04 2016 +0900
----------------------------------------------------------------------
.../hadoop-common/src/site/markdown/ClusterSetup.md | 1 +
.../hadoop-common/src/site/markdown/SingleCluster.md.vm | 12 ++++--------
2 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22befbd5/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md b/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
index 56b43e6..e2ccbf0 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md
@@ -156,6 +156,7 @@ This section deals with important parameters to be specified in the given config
| `yarn.nodemanager.remote-app-log-dir` | */logs* | HDFS directory where the application logs are moved on application completion. Need to set appropriate permissions. Only applicable if log-aggregation is enabled. |
| `yarn.nodemanager.remote-app-log-dir-suffix` | *logs* | Suffix appended to the remote log dir. Logs will be aggregated to ${yarn.nodemanager.remote-app-log-dir}/${user}/${thisParam} Only applicable if log-aggregation is enabled. |
| `yarn.nodemanager.aux-services` | mapreduce\_shuffle | Shuffle service that needs to be set for Map Reduce applications. |
+| `yarn.nodemanager.env-whitelist` | Environment properties to be inherited by containers from NodeManagers | For mapreduce application in addition to the default values HADOOP\_MAPRED_HOME should to be added. Property value should JAVA\_HOME,HADOOP\_COMMON\_HOME,HADOOP\_HDFS\_HOME,HADOOP\_CONF\_DIR,CLASSPATH\_PREPEND\_DISTCACHE,HADOOP\_YARN\_HOME,HADOOP\_MAPRED\_HOME |
* Configurations for History Server (Needs to be moved elsewhere):
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22befbd5/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm b/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
index 4825e00..fb4df9a 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/SingleCluster.md.vm
@@ -190,14 +190,6 @@ The following instructions assume that 1. ~ 4. steps of [the above instructions]
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
- <property>
- <name>mapreduce.admin.user.env</name>
- <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
- </property>
- <property>
- <name>yarn.app.mapreduce.am.env</name>
- <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
- </property>
</configuration>
`etc/hadoop/yarn-site.xml`:
@@ -207,6 +199,10 @@ The following instructions assume that 1. ~ 4. steps of [the above instructions]
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
+ <property>
+ <name>yarn.nodemanager.env-whitelist</name>
+ <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
+ </property>
</configuration>
2. Start ResourceManager daemon and NodeManager daemon:
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[11/14] hadoop git commit: HDFS-11272. Refine the assert messages in
TestFSDirAttrOp. Contributed by Jimmy Xiang.
Posted by xg...@apache.org.
HDFS-11272. Refine the assert messages in TestFSDirAttrOp. Contributed by Jimmy Xiang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea547529
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea547529
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea547529
Branch: refs/heads/YARN-5734
Commit: ea547529cb17c931a756ebffa6942f71f761ad13
Parents: 8f218ea
Author: Akira Ajisaka <aa...@apache.org>
Authored: Mon Dec 26 17:15:45 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Mon Dec 26 17:15:45 2016 +0900
----------------------------------------------------------------------
.../hadoop/hdfs/server/namenode/TestFSDirAttrOp.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea547529/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
index 8cd68a1..44cf57a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirAttrOp.java
@@ -52,25 +52,25 @@ public class TestFSDirAttrOp {
@Test
public void testUnprotectedSetTimes() throws Exception {
// atime < access time + precision
- assertFalse("SetTimes should not update access time"
+ assertFalse("SetTimes should not update access time "
+ "because it's within the last precision interval",
unprotectedSetTimes(100, 0, 1000, -1, false));
// atime = access time + precision
- assertFalse("SetTimes should not update access time"
+ assertFalse("SetTimes should not update access time "
+ "because it's within the last precision interval",
unprotectedSetTimes(1000, 0, 1000, -1, false));
// atime > access time + precision
- assertTrue("SetTimes should store access time",
+ assertTrue("SetTimes should update access time",
unprotectedSetTimes(1011, 10, 1000, -1, false));
// atime < access time + precision, but force is set
- assertTrue("SetTimes should store access time",
+ assertTrue("SetTimes should update access time",
unprotectedSetTimes(100, 0, 1000, -1, true));
// atime < access time + precision, but mtime is set
- assertTrue("SetTimes should store access time",
+ assertTrue("SetTimes should update access time",
unprotectedSetTimes(100, 0, 1000, 1, false));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[13/14] hadoop git commit: HADOOP-13943.
TestCommonConfigurationFields#testCompareXmlAgainstConfigurationClass fails
after HADOOP-13863. Contributed by Brahma Reddy Battula.
Posted by xg...@apache.org.
HADOOP-13943. TestCommonConfigurationFields#testCompareXmlAgainstConfigurationClass fails after HADOOP-13863. Contributed by Brahma Reddy Battula.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0665c5f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0665c5f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0665c5f0
Branch: refs/heads/YARN-5734
Commit: 0665c5f09afb2d6f59b6c4d4980f8b1ff9cbe620
Parents: ded2d08
Author: Akira Ajisaka <aa...@apache.org>
Authored: Wed Dec 28 01:39:30 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Wed Dec 28 01:39:30 2016 +0900
----------------------------------------------------------------------
.../org/apache/hadoop/conf/TestCommonConfigurationFields.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0665c5f0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 72316ad..571dfae 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -106,6 +106,13 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
xmlPropsToSkipCompare.add("fs.adl.impl");
xmlPropsToSkipCompare.add("fs.AbstractFileSystem.adl.impl");
+ // Azure properties are in a different class
+ // - org.apache.hadoop.fs.azure.AzureNativeFileSystemStore
+ // - org.apache.hadoop.fs.azure.SASKeyGeneratorImpl
+ xmlPropsToSkipCompare.add("fs.azure.sas.expiry.period");
+ xmlPropsToSkipCompare.add("fs.azure.local.sas.key.mode");
+ xmlPropsToSkipCompare.add("fs.azure.secure.mode");
+
// Deprecated properties. These should eventually be removed from the
// class.
configurationPropsToSkipCompare
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[10/14] hadoop git commit: HDFS-11250. Fix a typo in
ReplicaUnderRecovery#setRecoveryID. Contributed by Yiqun Lin.
Posted by xg...@apache.org.
HDFS-11250. Fix a typo in ReplicaUnderRecovery#setRecoveryID. Contributed by Yiqun Lin.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f218ea2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f218ea2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f218ea2
Branch: refs/heads/YARN-5734
Commit: 8f218ea2849477bdcb4918586aaefed0cd118341
Parents: 483cd06
Author: Akira Ajisaka <aa...@apache.org>
Authored: Mon Dec 26 16:51:29 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Mon Dec 26 16:51:29 2016 +0900
----------------------------------------------------------------------
.../apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f218ea2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 09140e7..324a8ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -65,7 +65,7 @@ public class ReplicaUnderRecovery extends LocalReplica {
if (recoveryId > this.recoveryId) {
this.recoveryId = recoveryId;
} else {
- throw new IllegalArgumentException("The new rcovery id: " + recoveryId
+ throw new IllegalArgumentException("The new recovery id: " + recoveryId
+ " must be greater than the current one: " + this.recoveryId);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[09/14] hadoop git commit: HDFS-11271. Typo in NameNode UI.
Contributed by Wei-Chiu Chuang.
Posted by xg...@apache.org.
HDFS-11271. Typo in NameNode UI. Contributed by Wei-Chiu Chuang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/483cd06a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/483cd06a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/483cd06a
Branch: refs/heads/YARN-5734
Commit: 483cd06ad43b85e00a782fe225f1f40657bab204
Parents: c721f78
Author: Akira Ajisaka <aa...@apache.org>
Authored: Mon Dec 26 16:36:54 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Mon Dec 26 16:36:54 2016 +0900
----------------------------------------------------------------------
.../hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/483cd06a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
index 3598c80..d71e798 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
@@ -152,11 +152,11 @@
{/fs}
</p>
{#mem.HeapMemoryUsage}
-<p>Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Heap Memory. Max Heap Memory is {@eq key=max value="-1" type="number"}<unbonded>{:else}{max|fmt_bytes}{/eq}.</p>
+<p>Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Heap Memory. Max Heap Memory is {@eq key=max value="-1" type="number"}<unbounded>{:else}{max|fmt_bytes}{/eq}.</p>
{/mem.HeapMemoryUsage}
{#mem.NonHeapMemoryUsage}
-<p>Non Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Commited Non Heap Memory. Max Non Heap Memory is {@eq key=max value="-1" type="number"}<unbonded>{:else}{max|fmt_bytes}{/eq}.</p>
+<p>Non Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Commited Non Heap Memory. Max Non Heap Memory is {@eq key=max value="-1" type="number"}<unbounded>{:else}{max|fmt_bytes}{/eq}.</p>
{/mem.NonHeapMemoryUsage}
{#nn}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[08/14] hadoop git commit: YARN-6026. A couple of spelling errors in
the docs. Contributed by Grant Sohn.
Posted by xg...@apache.org.
YARN-6026. A couple of spelling errors in the docs. Contributed by Grant Sohn.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c721f78a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c721f78a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c721f78a
Branch: refs/heads/YARN-5734
Commit: c721f78a3c6f560dff21ca966443551aba7541a4
Parents: 4d3f73a
Author: Naganarasimha <na...@apache.org>
Authored: Sat Dec 24 05:38:09 2016 +0530
Committer: Naganarasimha <na...@apache.org>
Committed: Sat Dec 24 05:38:09 2016 +0530
----------------------------------------------------------------------
.../hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md | 2 +-
.../hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c721f78a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
index a72d7e4..94d075b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
@@ -2286,7 +2286,7 @@ The *maximum-resource-capabilites* object contains the following elements:
| Item | Data Type | Description |
|:---- |:---- |:---- |
-| memory | int | The maxiumim memory available for a container |
+| memory | int | The maximum memory available for a container |
| vCores | int | The maximum number of cores available for a container |
### Response Examples
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c721f78a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
index 6b7bd08..e801e14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
@@ -334,7 +334,7 @@ After creating the timeline client, user also needs to set the timeline collecto
amRMClient.registerTimelineClient(timelineClient)\u037e
-Else address needs to be retreived from the AM allocate response and need to be set in timeline client explicitly.
+Else address needs to be retrieved from the AM allocate response and need to be set in timeline client explicitly.
timelineClient.setTimelineServiceAddress(response.getCollectorAddr());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[03/14] hadoop git commit: YARN-5903. Fix race condition in
TestResourceManagerAdministrationProtocolPBClientImpl beforeclass setup
method (Haibo Chen via Varun Saxena)
Posted by xg...@apache.org.
YARN-5903. Fix race condition in TestResourceManagerAdministrationProtocolPBClientImpl beforeclass setup method (Haibo Chen via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/38e66d4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/38e66d4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/38e66d4d
Branch: refs/heads/YARN-5734
Commit: 38e66d4d64f3c2e2bb43d8e5dca3866d672322b6
Parents: 56a13a6
Author: Varun Saxena <va...@apache.org>
Authored: Thu Dec 22 23:08:33 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Thu Dec 22 23:08:33 2016 +0530
----------------------------------------------------------------------
...nagerAdministrationProtocolPBClientImpl.java | 33 ++++++++++++++------
1 file changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/38e66d4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
index c3dd93d..fd83028 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceManagerAdministrationProtocolPBClientImpl.java
@@ -19,11 +19,15 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -77,22 +82,30 @@ public class TestResourceManagerAdministrationProtocolPBClientImpl {
protected void doSecureLogin() throws IOException {
}
};
+
+ // a reliable way to wait for resource manager to fully start
+ final CountDownLatch rmStartedSignal = new CountDownLatch(1);
+ ServiceStateChangeListener rmStateChangeListener =
+ new ServiceStateChangeListener() {
+ @Override
+ public void stateChanged(Service service) {
+ if (service.getServiceState() == STATE.STARTED) {
+ rmStartedSignal.countDown();
+ }
+ }
+ };
+ resourceManager.registerServiceListener(rmStateChangeListener);
+
resourceManager.init(configuration);
new Thread() {
public void run() {
resourceManager.start();
}
}.start();
- int waitCount = 0;
- while (resourceManager.getServiceState() == STATE.INITED
- && waitCount++ < 10) {
- LOG.info("Waiting for RM to start...");
- Thread.sleep(1000);
- }
- if (resourceManager.getServiceState() != STATE.STARTED) {
- throw new IOException("ResourceManager failed to start. Final state is "
- + resourceManager.getServiceState());
- }
+
+ boolean rmStarted = rmStartedSignal.await(60000L, TimeUnit.MILLISECONDS);
+ Assert.assertTrue("ResourceManager failed to start up.", rmStarted);
+
LOG.info("ResourceManager RMAdmin address: "
+ configuration.get(YarnConfiguration.RM_ADMIN_ADDRESS));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[02/14] hadoop git commit: HDFS-11216. Add remoteBytesRead counter
metrics for erasure coding reconstruction task. Contributed by Sammi Chen
Posted by xg...@apache.org.
HDFS-11216. Add remoteBytesRead counter metrics for erasure coding reconstruction task. Contributed by Sammi Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56a13a6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56a13a6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56a13a6a
Branch: refs/heads/YARN-5734
Commit: 56a13a6a59cb128cf6fdac78a074faf7e5603967
Parents: ae40153
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Dec 22 14:18:54 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Dec 22 14:18:54 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/test/MetricsAsserts.java | 7 +++++++
.../erasurecode/StripedBlockReader.java | 11 +++++++---
.../erasurecode/StripedBlockReconstructor.java | 1 +
.../erasurecode/StripedReconstructor.java | 14 +++++++++++--
.../datanode/metrics/DataNodeMetrics.java | 6 ++++++
.../TestDataNodeErasureCodingMetrics.java | 21 +++++++++++++++++++-
6 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
index 5d87b07..a7bbe84 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
@@ -236,6 +236,13 @@ public class MetricsAsserts {
return captor.getValue();
}
+ public static long getLongCounterWithoutCheck(String name,
+ MetricsRecordBuilder rb) {
+ ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
+ verify(rb, atLeast(0)).addCounter(eqName(info(name, "")), captor.capture());
+ return captor.getValue();
+ }
+
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 0f7c5c7..556158c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -65,6 +65,7 @@ class StripedBlockReader {
private final DatanodeInfo source;
private BlockReader blockReader;
private ByteBuffer buffer;
+ private boolean isLocal;
StripedBlockReader(StripedReader stripedReader, DataNode datanode,
Configuration conf, short index, ExtendedBlock block,
@@ -76,6 +77,7 @@ class StripedBlockReader {
this.index = index;
this.source = source;
this.block = block;
+ this.isLocal = false;
BlockReader tmpBlockReader = createBlockReader(offsetInBlock);
if (tmpBlockReader != null) {
@@ -116,10 +118,13 @@ class StripedBlockReader {
*
* TODO: add proper tracer
*/
+ Peer peer = newConnectedPeer(block, dnAddr, blockToken, source);
+ if (peer.isLocal()) {
+ this.isLocal = true;
+ }
return BlockReaderRemote.newBlockReader(
"dummy", block, blockToken, offsetInBlock,
- block.getNumBytes() - offsetInBlock, true,
- "", newConnectedPeer(block, dnAddr, blockToken, source), source,
+ block.getNumBytes() - offsetInBlock, true, "", peer, source,
null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1);
} catch (IOException e) {
LOG.info("Exception while creating remote block reader, datanode {}",
@@ -187,7 +192,7 @@ class StripedBlockReader {
break;
}
n += nread;
- stripedReader.getReconstructor().incrBytesRead(nread);
+ stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index 5554d68..a1da536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -70,6 +70,7 @@ class StripedBlockReconstructor extends StripedReconstructor
final DataNodeMetrics metrics = getDatanode().getMetrics();
metrics.incrECReconstructionTasks();
metrics.incrECReconstructionBytesRead(getBytesRead());
+ metrics.incrECReconstructionRemoteBytesRead(getRemoteBytesRead());
metrics.incrECReconstructionBytesWritten(getBytesWritten());
getStripedReader().close();
stripedWriter.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 68769f7..cd17864 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -118,6 +118,7 @@ abstract class StripedReconstructor {
// metrics
private AtomicLong bytesRead = new AtomicLong(0);
private AtomicLong bytesWritten = new AtomicLong(0);
+ private AtomicLong remoteBytesRead = new AtomicLong(0);
StripedReconstructor(ErasureCodingWorker worker,
StripedReconstructionInfo stripedReconInfo) {
@@ -138,8 +139,13 @@ abstract class StripedReconstructor {
positionInBlock = 0L;
}
- public void incrBytesRead(long delta) {
- bytesRead.addAndGet(delta);
+ public void incrBytesRead(boolean local, long delta) {
+ if (local) {
+ bytesRead.addAndGet(delta);
+ } else {
+ bytesRead.addAndGet(delta);
+ remoteBytesRead.addAndGet(delta);
+ }
}
public void incrBytesWritten(long delta) {
@@ -150,6 +156,10 @@ abstract class StripedReconstructor {
return bytesRead.get();
}
+ public long getRemoteBytesRead() {
+ return remoteBytesRead.get();
+ }
+
public long getBytesWritten() {
return bytesWritten.get();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index e09a85f..0d82fed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -140,6 +140,8 @@ public class DataNodeMetrics {
MutableCounterLong ecReconstructionBytesRead;
@Metric("Bytes written by erasure coding worker")
MutableCounterLong ecReconstructionBytesWritten;
+ @Metric("Bytes remote read by erasure coding worker")
+ MutableCounterLong ecReconstructionRemoteBytesRead;
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@@ -459,6 +461,10 @@ public class DataNodeMetrics {
ecReconstructionBytesRead.incr(bytes);
}
+ public void incrECReconstructionRemoteBytesRead(long bytes) {
+ ecReconstructionRemoteBytesRead.incr(bytes);
+ }
+
public void incrECReconstructionBytesWritten(long bytes) {
ecReconstructionBytesWritten.incr(bytes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56a13a6a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
index 64ddbd7..7e64214 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounterWithoutCheck;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -98,6 +99,8 @@ public class TestDataNodeErasureCodingMetrics {
blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
Assert.assertEquals("EcReconstructionBytesWritten should be ",
blockSize, getLongMetric("EcReconstructionBytesWritten"));
+ Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+ 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
}
// A partial block, reconstruct the partial block
@@ -110,6 +113,8 @@ public class TestDataNodeErasureCodingMetrics {
fileLen, getLongMetric("EcReconstructionBytesRead"));
Assert.assertEquals("EcReconstructionBytesWritten should be ",
fileLen, getLongMetric("EcReconstructionBytesWritten"));
+ Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+ 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
}
// 1 full block + 5 partial block, reconstruct the full block
@@ -121,8 +126,10 @@ public class TestDataNodeErasureCodingMetrics {
Assert.assertEquals("ecReconstructionBytesRead should be ",
cellSize * dataBlocks + cellSize + cellSize / 10,
getLongMetric("EcReconstructionBytesRead"));
- Assert.assertEquals("ecReconstructionBytesWritten should be ",
+ Assert.assertEquals("EcReconstructionBytesWritten should be ",
blockSize, getLongMetric("EcReconstructionBytesWritten"));
+ Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+ 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
}
// 1 full block + 5 partial block, reconstruct the partial block
@@ -137,6 +144,8 @@ public class TestDataNodeErasureCodingMetrics {
Assert.assertEquals("ecReconstructionBytesWritten should be ",
cellSize + cellSize / 10,
getLongMetric("EcReconstructionBytesWritten"));
+ Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
+ 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
}
private long getLongMetric(String metricName) {
@@ -149,6 +158,16 @@ public class TestDataNodeErasureCodingMetrics {
return metricValue;
}
+ private long getLongMetricWithoutCheck(String metricName) {
+ long metricValue = 0;
+ // Add all reconstruction metric value from all data nodes
+ for (DataNode dn : cluster.getDataNodes()) {
+ MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+ metricValue += getLongCounterWithoutCheck(metricName, rb);
+ }
+ return metricValue;
+ }
+
private void doTest(String fileName, int fileLen,
int deadNodeIndex) throws Exception {
assertTrue(fileLen > 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[12/14] hadoop git commit: HADOOP-13940. Document the missing envvars
commands (Contributed by Yiqun Lin via Daniel Templeton)
Posted by xg...@apache.org.
HADOOP-13940. Document the missing envvars commands (Contributed by Yiqun Lin via Daniel Templeton)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ded2d08f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ded2d08f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ded2d08f
Branch: refs/heads/YARN-5734
Commit: ded2d08f33c25daf17dbf3e5ff0ddfcf9980c6aa
Parents: ea54752
Author: Daniel Templeton <te...@apache.org>
Authored: Tue Dec 27 07:16:37 2016 -0800
Committer: Daniel Templeton <te...@apache.org>
Committed: Tue Dec 27 07:18:08 2016 -0800
----------------------------------------------------------------------
.../hadoop-common/src/site/markdown/CommandsManual.md | 6 ++++++
.../src/site/markdown/MapredCommands.md | 6 ++++++
.../hadoop-yarn-site/src/site/markdown/YarnCommands.md | 6 ++++++
3 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded2d08f/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md b/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
index ef76810..696848b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md
@@ -223,6 +223,12 @@ Usage: `hadoop CLASSNAME`
Runs the class named `CLASSNAME`. The class must be part of a package.
+### `envvars`
+
+Usage: `hadoop envvars`
+
+Display computed Hadoop environment variables.
+
Administration Commands
-----------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded2d08f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
index f312d31..6b7de2b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
@@ -140,6 +140,12 @@ Prints the version.
Usage: `mapred version`
+### `envvars`
+
+Usage: `mapred envvars`
+
+Display computed Hadoop environment variables.
+
Administration Commands
-----------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded2d08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
index 1d51b1f..56096f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
@@ -166,6 +166,12 @@ Usage: `yarn version`
Prints the Hadoop version.
+### `envvars`
+
+Usage: `yarn envvars`
+
+Display computed Hadoop environment variables.
+
Administration Commands
-----------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org