You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 01:36:12 UTC

[16/49] hadoop git commit: HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via Arpit Agarwal)

HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7100c0da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7100c0da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7100c0da

Branch: refs/heads/HDFS-1312
Commit: 7100c0da353d0960d3db71b029a36247838b24c6
Parents: 5724a10
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Jan 11 20:31:18 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:18:48 2016 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/ClientDatanodeProtocol.java   |  7 ++
 .../ClientDatanodeProtocolTranslatorPB.java     | 32 +++++++
 .../src/main/proto/ClientDatanodeProtocol.proto | 23 ++++++
 .../hadoop-hdfs/HDFS-1312_CHANGES.txt           |  3 +
 ...tDatanodeProtocolServerSideTranslatorPB.java | 27 ++++++
 .../hadoop/hdfs/server/datanode/DataNode.java   | 24 ++++++
 .../diskbalancer/DiskbalancerException.java     | 86 +++++++++++++++++++
 .../diskbalancer/TestDiskBalancerRPC.java       | 87 ++++++++++++++++++++
 8 files changed, 289 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index e541388..6e9cef0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -163,4 +163,11 @@ public interface ClientDatanodeProtocol {
    * @return balancer bandwidth
    */
   long getBalancerBandwidth() throws IOException;
+
+  /**
+   * Submit a disk balancer plan for execution.
+   */
+  void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth,
+                              String plan) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 6aaa025..da8d962 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetRe
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -326,4 +327,35 @@ public class ClientDatanodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  /**
+   * Submits a disk balancer plan to the datanode.
+   * @param planID - Plan ID is the hash512 string of the plan that is
+   *               submitted. This is used by clients when they want to find
+   *               local copies of these plans.
+   * @param planVersion - The data format of the plans - for future , not
+   *                    used now.
+   * @param bandwidth - Maximum disk bandwidth to consume, setting this value
+   *                  to zero allows datanode to use the value defined in
+   *                  configration.
+   * @param plan - Actual plan.
+   * @return Success or throws Exception.
+   * @throws Exception
+   */
+  @Override
+  public void submitDiskBalancerPlan(String planID, long planVersion,
+      long bandwidth, String plan) throws IOException {
+    try {
+      SubmitDiskBalancerPlanRequestProto request =
+          SubmitDiskBalancerPlanRequestProto.newBuilder()
+              .setPlanID(planID)
+              .setPlanVersion(planVersion)
+              .setMaxDiskBandwidth(bandwidth)
+              .setPlan(plan)
+              .build();
+      rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
index e135df8..d11979b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
@@ -150,6 +150,23 @@ message GetBalancerBandwidthResponseProto {
 }
 
 /**
+ * This message allows a client to submit a disk
+ * balancer plan to a data node.
+ */
+message SubmitDiskBalancerPlanRequestProto {
+  required string planID = 1; // A hash of the plan like SHA512
+  required string plan = 2; // Json String that describes the plan
+  optional uint64 planVersion = 3; // Plan version number
+  optional uint64 maxDiskBandwidth = 4; // optional bandwidth control.
+}
+
+/**
+ * Response from the DataNode on Plan Submit request
+ */
+message SubmitDiskBalancerPlanResponseProto {
+}
+
+/**
  * Protocol used from client to the Datanode.
  * See the request and response for details of rpc call.
  */
@@ -207,4 +224,10 @@ service ClientDatanodeProtocolService {
    */
   rpc getBalancerBandwidth(GetBalancerBandwidthRequestProto)
       returns(GetBalancerBandwidthResponseProto);
+
+  /**
+   * Submit a disk balancer plan for execution
+   */
+  rpc submitDiskBalancerPlan(SubmitDiskBalancerPlanRequestProto)
+      returns (SubmitDiskBalancerPlanResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index 940e1b5..6d8cde0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -13,3 +13,6 @@ HDFS-1312 Change Log
 
     HDFS-9469. DiskBalancer: Add Planner. (Anu Engineer via Arpit Agarwal)
 
+    HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via
+    Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index e0401f7..824f050 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.Start
 import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -232,4 +234,29 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
     return GetBalancerBandwidthResponseProto.newBuilder()
         .setBandwidth(bandwidth).build();
   }
+
+  /**
+   * Submit a disk balancer plan for execution.
+   * @param controller  - RpcController
+   * @param request   - Request
+   * @return   Response
+   * @throws ServiceException
+   */
+  @Override
+  public SubmitDiskBalancerPlanResponseProto submitDiskBalancerPlan(
+      RpcController controller, SubmitDiskBalancerPlanRequestProto request)
+      throws ServiceException {
+    try {
+      impl.submitDiskBalancerPlan(request.getPlanID(),
+          request.hasPlanVersion() ? request.getPlanVersion() : 0,
+          request.hasMaxDiskBandwidth() ? request.getMaxDiskBandwidth() : 0,
+          request.getPlan());
+      SubmitDiskBalancerPlanResponseProto response =
+          SubmitDiskBalancerPlanResponseProto.newBuilder()
+              .build();
+      return response;
+    } catch(Exception e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a59a59f..e06555f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3286,4 +3287,27 @@ public class DataNode extends ReconfigurableBase
   public Tracer getTracer() {
     return tracer;
   }
+
+  /**
+   * Allows submission of a disk balancer Job.
+   * @param planID  - Hash value of the plan.
+   * @param planVersion - Plan version, reserved for future use. We have only
+   *                    version 1 now.
+   * @param bandwidth - Max disk bandwidth to use, 0 means use value defined
+   *                  in the configration.
+   * @param plan - Actual plan
+   * @return  success or throws an exception.
+   * @throws Exception
+   */
+  @Override
+  public void submitDiskBalancerPlan(String planID,
+      long planVersion, long bandwidth, String plan) throws IOException {
+
+    // TODO : This will be replaced with actual code later.
+    // Right now throwing DiskbalancerException instead
+    // NotImplementedException to indicate the eventually disk balancer code
+    // will throw DiskbalancerException.
+    throw new DiskbalancerException("Not Implemented", 0);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
new file mode 100644
index 0000000..9d47dc3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import java.io.IOException;
+
+/**
+ * Disk Balancer Exceptions.
+ */
+public class DiskbalancerException extends IOException {
+  private int result;
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   *                the
+   *                {@link #getMessage()} method)
+   */
+  public DiskbalancerException(String message, int result) {
+    super(message);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message and
+   * cause.
+   * <p/>
+   * <p> Note that the detail message associated with {@code cause} is
+   * <i>not</i>
+   * automatically incorporated into this exception's detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   *                the
+   *                {@link #getMessage()} method)
+   * @param cause   The cause (which is saved for later retrieval by the {@link
+   *                #getCause()} method).  (A null value is permitted, and
+   *                indicates that the cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public DiskbalancerException(String message, Throwable cause, int result) {
+    super(message, cause);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified cause and a detail
+   * message of {@code (cause==null ? null : cause.toString())} (which typically
+   * contains the class and detail message of {@code cause}). This
+   * constructor is
+   * useful for IO exceptions that are little more than wrappers for other
+   * throwables.
+   *
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   *              #getCause()} method).  (A null value is permitted, and
+   *              indicates
+   *              that the cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public DiskbalancerException(Throwable cause, int result) {
+    super(cause);
+    this.result = result;
+  }
+
+  /**
+   * Returns the result.
+   * @return int
+   */
+  public int getResult() {
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
new file mode 100644
index 0000000..e047d5a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.net.URI;
+
+public class TestDiskBalancerRPC {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private MiniDFSCluster cluster;
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void TestSubmitTestRpc() throws Exception {
+    URI clusterJson = getClass()
+        .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI();
+    ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
+        null);
+    DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
+    diskBalancerCluster.readClusterInfo();
+    Assert.assertEquals(3, diskBalancerCluster.getNodes().size());
+    diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+    DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
+    GreedyPlanner planner = new GreedyPlanner(10.0f, node);
+    NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
+        ());
+    planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
+
+    final int dnIndex = 0;
+    final int planVersion = 0; // So far we support only one version.
+    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+    String planHash = DigestUtils.sha512Hex(plan.toJson());
+
+    // Since submitDiskBalancerPlan is not implemented yet, it throws an
+    // Exception, this will be modified with the actual implementation.
+    thrown.expect(DiskbalancerException.class);
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org