You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/04 02:02:28 UTC
[49/50] [abbrv] hadoop git commit: HDFS-9683. DiskBalancer: Add
cancelPlan implementation. (Contributed by Anu Engineer)
HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Contributed by Anu Engineer)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/889dc8b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/889dc8b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/889dc8b5
Branch: refs/heads/HDFS-1312
Commit: 889dc8b55340064b6499398b3f99f7c65879ce8e
Parents: 3bb9d5e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Mar 3 17:00:52 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Mar 3 17:00:52 2016 -0800
----------------------------------------------------------------------
.../hadoop-hdfs/HDFS-1312_CHANGES.txt | 3 +
.../hadoop/hdfs/server/datanode/DataNode.java | 7 +-
.../hdfs/server/datanode/DiskBalancer.java | 26 +++
.../diskbalancer/DiskBalancerException.java | 3 +-
.../diskbalancer/TestDiskBalancerRPC.java | 213 ++++++++++++-------
5 files changed, 173 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index 07403cf..919d73e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -30,3 +30,6 @@ HDFS-1312 Change Log
HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via
Arpit Agarwal)
+ HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Anu Engineer via
+ Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index f440a3a..bfc1972 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3280,12 +3280,15 @@ public class DataNode extends ReconfigurableBase
this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
}
+ /**
+ * Cancels a running plan.
+ * @param planID - Hash string that identifies a plan
+ */
@Override
public void cancelDiskBalancePlan(String planID) throws
IOException {
checkSuperuserPrivilege();
- throw new DiskBalancerException("Not Implemented",
- DiskBalancerException.Result.INTERNAL_ERROR);
+ this.diskBalancer.cancelPlan(planID);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index c01fb4e..81dbb2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -197,6 +197,32 @@ public class DiskBalancer {
}
/**
+ * Cancels a running plan.
+ * @param planID - Hash of the plan to cancel.
+ * @throws DiskBalancerException
+ */
+ public void cancelPlan(String planID) throws DiskBalancerException {
+ lock.lock();
+ try {
+ checkDiskBalancerEnabled();
+ if ((this.planID == null) || (!this.planID.equals(planID))) {
+ LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " +
+ planID);
+ throw new DiskBalancerException("No such plan.",
+ DiskBalancerException.Result.NO_SUCH_PLAN);
+ }
+ if (!this.future.isDone()) {
+ this.blockMover.setExitFlag();
+ shutdownExecutor();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+
+ /**
* Throws if Disk balancer is disabled.
*
* @throws DiskBalancerException
http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
index a5e1581..00fe53d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
@@ -35,7 +35,8 @@ public class DiskBalancerException extends IOException {
PLAN_ALREADY_IN_PROGRESS,
INVALID_VOLUME,
INVALID_MOVE,
- INTERNAL_ERROR
+ INTERNAL_ERROR,
+ NO_SUCH_PLAN
}
private final Result result;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 974e973..e29b3b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p/>
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -37,6 +37,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
@@ -63,103 +64,163 @@ public class TestDiskBalancerRPC {
}
@Test
- public void testSubmitTestRpc() throws Exception {
- final int dnIndex = 0;
- cluster.restartDataNode(dnIndex);
- cluster.waitActive();
- ClusterConnector nameNodeConnector =
- ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+ public void testSubmitPlan() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ int planVersion = rpcTestHelper.getPlanVersion();
+ NodePlan plan = rpcTestHelper.getPlan();
+ dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+ }
- DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
- diskBalancerCluster.readClusterInfo();
- Assert.assertEquals(cluster.getDataNodes().size(),
- diskBalancerCluster.getNodes().size());
- diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+ @Test
+ public void testSubmitPlanWithInvalidHash() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ char hashArray[] = planHash.toCharArray();
+ hashArray[0]++;
+ planHash = String.valueOf(hashArray);
+ int planVersion = rpcTestHelper.getPlanVersion();
+ NodePlan plan = rpcTestHelper.getPlan();
+ thrown.expect(DiskBalancerException.class);
+ dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+ }
- DataNode dataNode = cluster.getDataNodes().get(dnIndex);
- DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
- dataNode.getDatanodeUuid());
- GreedyPlanner planner = new GreedyPlanner(10.0f, node);
- NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
- ());
- planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
- final int planVersion = 1; // So far we support only one version.
+ @Test
+ public void testSubmitPlanWithInvalidVersion() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ int planVersion = rpcTestHelper.getPlanVersion();
+ planVersion++;
+ NodePlan plan = rpcTestHelper.getPlan();
+ thrown.expect(DiskBalancerException.class);
+ dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+ }
- String planHash = DigestUtils.sha512Hex(plan.toJson());
+ @Test
+ public void testSubmitPlanWithInvalidPlan() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ int planVersion = rpcTestHelper.getPlanVersion();
+ NodePlan plan = rpcTestHelper.getPlan();
+ thrown.expect(DiskBalancerException.class);
+ dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
+ }
+ @Test
+ public void testCancelPlan() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ int planVersion = rpcTestHelper.getPlanVersion();
+ NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+ dataNode.cancelDiskBalancePlan(planHash);
}
@Test
- public void testCancelTestRpc() throws Exception {
- final int dnIndex = 0;
- cluster.restartDataNode(dnIndex);
- cluster.waitActive();
- ClusterConnector nameNodeConnector =
- ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
-
- DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
- diskBalancerCluster.readClusterInfo();
- Assert.assertEquals(cluster.getDataNodes().size(),
- diskBalancerCluster.getNodes().size());
- diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
- DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
- GreedyPlanner planner = new GreedyPlanner(10.0f, node);
- NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
- ());
- planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
-
- final int planVersion = 0; // So far we support only one version.
- DataNode dataNode = cluster.getDataNodes().get(dnIndex);
- String planHash = DigestUtils.sha512Hex(plan.toJson());
-
- // Since submitDiskBalancerPlan is not implemented yet, it throws an
- // Exception, this will be modified with the actual implementation.
- try {
- dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
- } catch (DiskBalancerException ex) {
- // Let us ignore this for time being.
- }
+ public void testCancelNonExistentPlan() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ char hashArray[] = planHash.toCharArray();
+ hashArray[0]++;
+ planHash = String.valueOf(hashArray);
+ NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
dataNode.cancelDiskBalancePlan(planHash);
}
@Test
- public void testQueryTestRpc() throws Exception {
- final int dnIndex = 0;
- cluster.restartDataNode(dnIndex);
- cluster.waitActive();
- ClusterConnector nameNodeConnector =
- ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
-
- DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster
- (nameNodeConnector);
- diskBalancerCluster.readClusterInfo();
- Assert.assertEquals(cluster.getDataNodes().size(),
- diskBalancerCluster.getNodes().size());
- diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
- DataNode dataNode = cluster.getDataNodes().get(dnIndex);
- DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
- dataNode.getDatanodeUuid());
- GreedyPlanner planner = new GreedyPlanner(10.0f, node);
- NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
- ());
- planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
-
- final int planVersion = 1; // So far we support only one version.
- String planHash = DigestUtils.sha512Hex(plan.toJson());
- dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+ public void testCancelEmptyPlan() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = "";
+ NodePlan plan = rpcTestHelper.getPlan();
+ thrown.expect(DiskBalancerException.class);
+ dataNode.cancelDiskBalancePlan(planHash);
+ }
+
+
+ @Test
+ public void testQueryPlan() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+ String planHash = rpcTestHelper.getPlanHash();
+ int planVersion = rpcTestHelper.getPlanVersion();
+ NodePlan plan = rpcTestHelper.getPlan();
+
+ dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
status.getResult() == PLAN_DONE);
}
@Test
- public void testgetDiskBalancerSetting() throws Exception {
+ public void testQueryPlanWithoutSubmit() throws Exception {
+ RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+ DataNode dataNode = rpcTestHelper.getDataNode();
+
+ DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
+ Assert.assertTrue(status.getResult() == NO_PLAN);
+ }
+
+ @Test
+ public void testGetDiskBalancerSetting() throws Exception {
final int dnIndex = 0;
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
thrown.expect(DiskBalancerException.class);
dataNode.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
}
+
+ private class RpcTestHelper {
+ private NodePlan plan;
+ private int planVersion;
+ private DataNode dataNode;
+ private String planHash;
+
+ public NodePlan getPlan() {
+ return plan;
+ }
+
+ public int getPlanVersion() {
+ return planVersion;
+ }
+
+ public DataNode getDataNode() {
+ return dataNode;
+ }
+
+ public String getPlanHash() {
+ return planHash;
+ }
+
+ public RpcTestHelper invoke() throws Exception {
+ final int dnIndex = 0;
+ cluster.restartDataNode(dnIndex);
+ cluster.waitActive();
+ ClusterConnector nameNodeConnector =
+ ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+ DiskBalancerCluster diskBalancerCluster =
+ new DiskBalancerCluster(nameNodeConnector);
+ diskBalancerCluster.readClusterInfo();
+ Assert.assertEquals(cluster.getDataNodes().size(),
+ diskBalancerCluster.getNodes().size());
+ diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+ dataNode = cluster.getDataNodes().get(dnIndex);
+ DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
+ dataNode.getDatanodeUuid());
+ GreedyPlanner planner = new GreedyPlanner(10.0f, node);
+ plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort());
+ planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
+ planVersion = 1;
+ planHash = DigestUtils.sha512Hex(plan.toJson());
+ return this;
+ }
+ }
}