You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/04 02:02:28 UTC

[49/50] [abbrv] hadoop git commit: HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Contributed by Anu Engineer)

HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Contributed by Anu Engineer)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/889dc8b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/889dc8b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/889dc8b5

Branch: refs/heads/HDFS-1312
Commit: 889dc8b55340064b6499398b3f99f7c65879ce8e
Parents: 3bb9d5e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Mar 3 17:00:52 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Mar 3 17:00:52 2016 -0800

----------------------------------------------------------------------
 .../hadoop-hdfs/HDFS-1312_CHANGES.txt           |   3 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   7 +-
 .../hdfs/server/datanode/DiskBalancer.java      |  26 +++
 .../diskbalancer/DiskBalancerException.java     |   3 +-
 .../diskbalancer/TestDiskBalancerRPC.java       | 213 ++++++++++++-------
 5 files changed, 173 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index 07403cf..919d73e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -30,3 +30,6 @@ HDFS-1312 Change Log
     HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via
     Arpit Agarwal)
 
+    HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Anu Engineer via
+    Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index f440a3a..bfc1972 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3280,12 +3280,15 @@ public class DataNode extends ReconfigurableBase
     this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
   }
 
+  /**
+   * Cancels a running plan.
+   * @param planID - Hash string that identifies a plan
+   */
   @Override
   public void cancelDiskBalancePlan(String planID) throws
       IOException {
     checkSuperuserPrivilege();
-    throw new DiskBalancerException("Not Implemented",
-        DiskBalancerException.Result.INTERNAL_ERROR);
+    this.diskBalancer.cancelPlan(planID);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index c01fb4e..81dbb2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -197,6 +197,32 @@ public class DiskBalancer {
   }
 
   /**
+   * Cancels a running plan.
+   * @param planID - Hash of the plan to cancel.
+   * @throws DiskBalancerException
+   */
+  public void cancelPlan(String planID) throws DiskBalancerException {
+    lock.lock();
+    try {
+      checkDiskBalancerEnabled();
+      if ((this.planID == null) || (!this.planID.equals(planID))) {
+        LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " +
+            planID);
+        throw new DiskBalancerException("No such plan.",
+            DiskBalancerException.Result.NO_SUCH_PLAN);
+      }
+      if (!this.future.isDone()) {
+        this.blockMover.setExitFlag();
+        shutdownExecutor();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+
+
+  /**
    * Throws if Disk balancer is disabled.
    *
    * @throws DiskBalancerException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
index a5e1581..00fe53d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
@@ -35,7 +35,8 @@ public class DiskBalancerException extends IOException {
     PLAN_ALREADY_IN_PROGRESS,
     INVALID_VOLUME,
     INVALID_MOVE,
-    INTERNAL_ERROR
+    INTERNAL_ERROR,
+    NO_SUCH_PLAN
   }
 
   private final Result result;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889dc8b5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 974e973..e29b3b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -37,6 +37,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
 
@@ -63,103 +64,163 @@ public class TestDiskBalancerRPC {
   }
 
   @Test
-  public void testSubmitTestRpc() throws Exception {
-    final int dnIndex = 0;
-    cluster.restartDataNode(dnIndex);
-    cluster.waitActive();
-    ClusterConnector nameNodeConnector =
-        ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+  public void testSubmitPlan() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    int planVersion = rpcTestHelper.getPlanVersion();
+    NodePlan plan = rpcTestHelper.getPlan();
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+  }
 
-    DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
-    diskBalancerCluster.readClusterInfo();
-    Assert.assertEquals(cluster.getDataNodes().size(),
-                                    diskBalancerCluster.getNodes().size());
-    diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+  @Test
+  public void testSubmitPlanWithInvalidHash() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    char hashArray[] = planHash.toCharArray();
+    hashArray[0]++;
+    planHash = String.valueOf(hashArray);
+    int planVersion = rpcTestHelper.getPlanVersion();
+    NodePlan plan = rpcTestHelper.getPlan();
+    thrown.expect(DiskBalancerException.class);
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+  }
 
-    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
-    DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
-        dataNode.getDatanodeUuid());
-    GreedyPlanner planner = new GreedyPlanner(10.0f, node);
-    NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
-        ());
-    planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
-    final int planVersion = 1; // So far we support only one version.
+  @Test
+  public void testSubmitPlanWithInvalidVersion() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    int planVersion = rpcTestHelper.getPlanVersion();
+    planVersion++;
+    NodePlan plan = rpcTestHelper.getPlan();
+    thrown.expect(DiskBalancerException.class);
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+  }
 
-    String planHash = DigestUtils.sha512Hex(plan.toJson());
+  @Test
+  public void testSubmitPlanWithInvalidPlan() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    int planVersion = rpcTestHelper.getPlanVersion();
+    NodePlan plan = rpcTestHelper.getPlan();
+    thrown.expect(DiskBalancerException.class);
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
+  }
 
+  @Test
+  public void testCancelPlan() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    int planVersion = rpcTestHelper.getPlanVersion();
+    NodePlan plan = rpcTestHelper.getPlan();
     dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.cancelDiskBalancePlan(planHash);
   }
 
   @Test
-  public void testCancelTestRpc() throws Exception {
-    final int dnIndex = 0;
-    cluster.restartDataNode(dnIndex);
-    cluster.waitActive();
-    ClusterConnector nameNodeConnector =
-        ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
-
-    DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
-    diskBalancerCluster.readClusterInfo();
-    Assert.assertEquals(cluster.getDataNodes().size(),
-        diskBalancerCluster.getNodes().size());
-    diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
-    DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
-    GreedyPlanner planner = new GreedyPlanner(10.0f, node);
-    NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
-        ());
-    planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
-
-    final int planVersion = 0; // So far we support only one version.
-    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
-    String planHash = DigestUtils.sha512Hex(plan.toJson());
-
-    // Since submitDiskBalancerPlan is not implemented yet, it throws an
-    // Exception, this will be modified with the actual implementation.
-    try {
-      dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
-    } catch (DiskBalancerException ex) {
-      // Let us ignore this for time being.
-    }
+  public void testCancelNonExistentPlan() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    char hashArray[] = planHash.toCharArray();
+    hashArray[0]++;
+    planHash = String.valueOf(hashArray);
+    NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
     dataNode.cancelDiskBalancePlan(planHash);
   }
 
   @Test
-  public void testQueryTestRpc() throws Exception {
-    final int dnIndex = 0;
-    cluster.restartDataNode(dnIndex);
-    cluster.waitActive();
-    ClusterConnector nameNodeConnector =
-        ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
-
-    DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster
-        (nameNodeConnector);
-    diskBalancerCluster.readClusterInfo();
-    Assert.assertEquals(cluster.getDataNodes().size(),
-        diskBalancerCluster.getNodes().size());
-    diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
-    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
-    DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
-        dataNode.getDatanodeUuid());
-    GreedyPlanner planner = new GreedyPlanner(10.0f, node);
-    NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
-        ());
-    planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
-
-    final int planVersion = 1; // So far we support only one version.
-    String planHash = DigestUtils.sha512Hex(plan.toJson());
-      dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+  public void testCancelEmptyPlan() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = "";
+    NodePlan plan = rpcTestHelper.getPlan();
+    thrown.expect(DiskBalancerException.class);
+    dataNode.cancelDiskBalancePlan(planHash);
+  }
+
+
+  @Test
+  public void testQueryPlan() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+    String planHash = rpcTestHelper.getPlanHash();
+    int planVersion = rpcTestHelper.getPlanVersion();
+    NodePlan plan = rpcTestHelper.getPlan();
+
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
     DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
     Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
         status.getResult() == PLAN_DONE);
   }
 
   @Test
-  public void testgetDiskBalancerSetting() throws Exception {
+  public void testQueryPlanWithoutSubmit() throws Exception {
+    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
+    DataNode dataNode = rpcTestHelper.getDataNode();
+
+    DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
+    Assert.assertTrue(status.getResult() == NO_PLAN);
+  }
+
+  @Test
+  public void testGetDiskBalancerSetting() throws Exception {
     final int dnIndex = 0;
     DataNode dataNode = cluster.getDataNodes().get(dnIndex);
     thrown.expect(DiskBalancerException.class);
     dataNode.getDiskBalancerSetting(
         DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
   }
+
+  private class RpcTestHelper {
+    private NodePlan plan;
+    private int planVersion;
+    private DataNode dataNode;
+    private String planHash;
+
+    public NodePlan getPlan() {
+      return plan;
+    }
+
+    public int getPlanVersion() {
+      return planVersion;
+    }
+
+    public DataNode getDataNode() {
+      return dataNode;
+    }
+
+    public String getPlanHash() {
+      return planHash;
+    }
+
+    public RpcTestHelper invoke() throws Exception {
+      final int dnIndex = 0;
+      cluster.restartDataNode(dnIndex);
+      cluster.waitActive();
+      ClusterConnector nameNodeConnector =
+          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+      DiskBalancerCluster diskBalancerCluster =
+          new DiskBalancerCluster(nameNodeConnector);
+      diskBalancerCluster.readClusterInfo();
+      Assert.assertEquals(cluster.getDataNodes().size(),
+          diskBalancerCluster.getNodes().size());
+      diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+      dataNode = cluster.getDataNodes().get(dnIndex);
+      DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
+          dataNode.getDatanodeUuid());
+      GreedyPlanner planner = new GreedyPlanner(10.0f, node);
+      plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort());
+      planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
+      planVersion = 1;
+      planHash = DigestUtils.sha512Hex(plan.toJson());
+      return this;
+    }
+  }
 }