You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 06:05:42 UTC
[11/49] hadoop git commit: HDFS-9588. DiskBalancer: Add
submitDiskbalancer RPC. (Anu Engineer via Arpit Agarwal)
HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7100c0da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7100c0da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7100c0da
Branch: refs/heads/trunk
Commit: 7100c0da353d0960d3db71b029a36247838b24c6
Parents: 5724a10
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Jan 11 20:31:18 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:18:48 2016 -0700
----------------------------------------------------------------------
.../hdfs/protocol/ClientDatanodeProtocol.java | 7 ++
.../ClientDatanodeProtocolTranslatorPB.java | 32 +++++++
.../src/main/proto/ClientDatanodeProtocol.proto | 23 ++++++
.../hadoop-hdfs/HDFS-1312_CHANGES.txt | 3 +
...tDatanodeProtocolServerSideTranslatorPB.java | 27 ++++++
.../hadoop/hdfs/server/datanode/DataNode.java | 24 ++++++
.../diskbalancer/DiskbalancerException.java | 86 +++++++++++++++++++
.../diskbalancer/TestDiskBalancerRPC.java | 87 ++++++++++++++++++++
8 files changed, 289 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index e541388..6e9cef0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -163,4 +163,11 @@ public interface ClientDatanodeProtocol {
* @return balancer bandwidth
*/
long getBalancerBandwidth() throws IOException;
+
+ /**
+ * Submit a disk balancer plan for execution.
+ */
+ void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth,
+ String plan) throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 6aaa025..da8d962 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetRe
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -326,4 +327,35 @@ public class ClientDatanodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ /**
+ * Submits a disk balancer plan to the datanode.
+ * @param planID - Plan ID is the hash512 string of the plan that is
+ * submitted. This is used by clients when they want to find
+ * local copies of these plans.
+ * @param planVersion - The data format of the plans - for future , not
+ * used now.
+ * @param bandwidth - Maximum disk bandwidth to consume, setting this value
+ * to zero allows datanode to use the value defined in
+ * configration.
+ * @param plan - Actual plan.
+ * @return Success or throws Exception.
+ * @throws Exception
+ */
+ @Override
+ public void submitDiskBalancerPlan(String planID, long planVersion,
+ long bandwidth, String plan) throws IOException {
+ try {
+ SubmitDiskBalancerPlanRequestProto request =
+ SubmitDiskBalancerPlanRequestProto.newBuilder()
+ .setPlanID(planID)
+ .setPlanVersion(planVersion)
+ .setMaxDiskBandwidth(bandwidth)
+ .setPlan(plan)
+ .build();
+ rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
index e135df8..d11979b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
@@ -150,6 +150,23 @@ message GetBalancerBandwidthResponseProto {
}
/**
+ * This message allows a client to submit a disk
+ * balancer plan to a data node.
+ */
+message SubmitDiskBalancerPlanRequestProto {
+ required string planID = 1; // A hash of the plan like SHA512
+ required string plan = 2; // Json String that describes the plan
+ optional uint64 planVersion = 3; // Plan version number
+ optional uint64 maxDiskBandwidth = 4; // optional bandwidth control.
+}
+
+/**
+ * Response from the DataNode on Plan Submit request
+ */
+message SubmitDiskBalancerPlanResponseProto {
+}
+
+/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
*/
@@ -207,4 +224,10 @@ service ClientDatanodeProtocolService {
*/
rpc getBalancerBandwidth(GetBalancerBandwidthRequestProto)
returns(GetBalancerBandwidthResponseProto);
+
+ /**
+ * Submit a disk balancer plan for execution
+ */
+ rpc submitDiskBalancerPlan(SubmitDiskBalancerPlanRequestProto)
+ returns (SubmitDiskBalancerPlanResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index 940e1b5..6d8cde0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -13,3 +13,6 @@ HDFS-1312 Change Log
HDFS-9469. DiskBalancer: Add Planner. (Anu Engineer via Arpit Agarwal)
+ HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via
+ Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index e0401f7..824f050 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.Start
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -232,4 +234,29 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
return GetBalancerBandwidthResponseProto.newBuilder()
.setBandwidth(bandwidth).build();
}
+
+ /**
+ * Submit a disk balancer plan for execution.
+ * @param controller - RpcController
+ * @param request - Request
+ * @return Response
+ * @throws ServiceException
+ */
+ @Override
+ public SubmitDiskBalancerPlanResponseProto submitDiskBalancerPlan(
+ RpcController controller, SubmitDiskBalancerPlanRequestProto request)
+ throws ServiceException {
+ try {
+ impl.submitDiskBalancerPlan(request.getPlanID(),
+ request.hasPlanVersion() ? request.getPlanVersion() : 0,
+ request.hasMaxDiskBandwidth() ? request.getMaxDiskBandwidth() : 0,
+ request.getPlan());
+ SubmitDiskBalancerPlanResponseProto response =
+ SubmitDiskBalancerPlanResponseProto.newBuilder()
+ .build();
+ return response;
+ } catch(Exception e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a59a59f..e06555f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3286,4 +3287,27 @@ public class DataNode extends ReconfigurableBase
public Tracer getTracer() {
return tracer;
}
+
+ /**
+ * Allows submission of a disk balancer Job.
+ * @param planID - Hash value of the plan.
+ * @param planVersion - Plan version, reserved for future use. We have only
+ * version 1 now.
+ * @param bandwidth - Max disk bandwidth to use, 0 means use value defined
+ * in the configration.
+ * @param plan - Actual plan
+ * @return success or throws an exception.
+ * @throws Exception
+ */
+ @Override
+ public void submitDiskBalancerPlan(String planID,
+ long planVersion, long bandwidth, String plan) throws IOException {
+
+ // TODO : This will be replaced with actual code later.
+ // Right now throwing DiskbalancerException instead
+ // NotImplementedException to indicate the eventually disk balancer code
+ // will throw DiskbalancerException.
+ throw new DiskbalancerException("Not Implemented", 0);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
new file mode 100644
index 0000000..9d47dc3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import java.io.IOException;
+
+/**
+ * Disk Balancer Exceptions.
+ */
+public class DiskbalancerException extends IOException {
+ private int result;
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ */
+ public DiskbalancerException(String message, int result) {
+ super(message);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message and
+ * cause.
+ * <p/>
+ * <p> Note that the detail message associated with {@code cause} is
+ * <i>not</i>
+ * automatically incorporated into this exception's detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and
+ * indicates that the cause is nonexistent or unknown.)
+ * @since 1.6
+ */
+ public DiskbalancerException(String message, Throwable cause, int result) {
+ super(message, cause);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified cause and a detail
+ * message of {@code (cause==null ? null : cause.toString())} (which typically
+ * contains the class and detail message of {@code cause}). This
+ * constructor is
+ * useful for IO exceptions that are little more than wrappers for other
+ * throwables.
+ *
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and
+ * indicates
+ * that the cause is nonexistent or unknown.)
+ * @since 1.6
+ */
+ public DiskbalancerException(Throwable cause, int result) {
+ super(cause);
+ this.result = result;
+ }
+
+ /**
+ * Returns the result.
+ * @return int
+ */
+ public int getResult() {
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7100c0da/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
new file mode 100644
index 0000000..e047d5a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.net.URI;
+
+public class TestDiskBalancerRPC {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private MiniDFSCluster cluster;
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void TestSubmitTestRpc() throws Exception {
+ URI clusterJson = getClass()
+ .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI();
+ ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
+ null);
+ DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
+ diskBalancerCluster.readClusterInfo();
+ Assert.assertEquals(3, diskBalancerCluster.getNodes().size());
+ diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+ DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
+ GreedyPlanner planner = new GreedyPlanner(10.0f, node);
+ NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
+ ());
+ planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
+
+ final int dnIndex = 0;
+ final int planVersion = 0; // So far we support only one version.
+ DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+ String planHash = DigestUtils.sha512Hex(plan.toJson());
+
+ // Since submitDiskBalancerPlan is not implemented yet, it throws an
+ // Exception, this will be modified with the actual implementation.
+ thrown.expect(DiskbalancerException.class);
+ dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org