You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 06:05:46 UTC
[15/49] hadoop git commit: HDFS-9671. DiskBalancer: SubmitPlan
implementation. (Contributed by Anu Engineer)
HDFS-9671. DiskBalancer: SubmitPlan implementation. (Contributed by Anu Engineer)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b1b2faf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b1b2faf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b1b2faf
Branch: refs/heads/trunk
Commit: 2b1b2faf76a7ff148650a7836935a85439f60c49
Parents: 66f0bb6
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Feb 22 11:45:51 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:18:48 2016 -0700
----------------------------------------------------------------------
.../hdfs/protocol/ClientDatanodeProtocol.java | 4 +-
.../ClientDatanodeProtocolTranslatorPB.java | 10 +-
.../server/datanode/DiskBalancerWorkItem.java | 160 ++++++
.../server/datanode/DiskBalancerWorkStatus.java | 87 +++
.../hadoop/hdfs/server/datanode/WorkStatus.java | 85 ---
.../hadoop-hdfs/HDFS-1312_CHANGES.txt | 7 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
...tDatanodeProtocolServerSideTranslatorPB.java | 4 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 56 +-
.../hdfs/server/datanode/DiskBalancer.java | 542 +++++++++++++++++++
.../diskbalancer/DiskBalancerConstants.java | 9 +
.../diskbalancer/DiskBalancerException.java | 98 ++++
.../diskbalancer/DiskbalancerException.java | 86 ---
.../datamodel/DiskBalancerCluster.java | 14 +
.../diskbalancer/TestDiskBalancerRPC.java | 28 +-
15 files changed, 984 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index dede89e..d8df7fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
/** An client-datanode protocol for block recovery
*/
@@ -182,7 +182,7 @@ public interface ClientDatanodeProtocol {
/**
* Gets the status of an executing diskbalancer Plan.
*/
- WorkStatus queryDiskBalancerPlan() throws IOException;
+ DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException;
/**
* Gets a run-time configuration value from running diskbalancer instance.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index e7e0d94..786d834 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryP
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
@@ -345,8 +345,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
* to zero allows datanode to use the value defined in
* configration.
* @param plan - Actual plan.
- * @return Success or throws Exception.
- * @throws Exception
+ * @throws IOException
*/
@Override
public void submitDiskBalancerPlan(String planID, long planVersion,
@@ -387,13 +386,14 @@ public class ClientDatanodeProtocolTranslatorPB implements
* Gets the status of an executing diskbalancer Plan.
*/
@Override
- public WorkStatus queryDiskBalancerPlan() throws IOException {
+ public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
try {
QueryPlanStatusRequestProto request =
QueryPlanStatusRequestProto.newBuilder().build();
QueryPlanStatusResponseProto response =
rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
- return new WorkStatus(response.hasResult() ? response.getResult() : 0,
+ return new DiskBalancerWorkStatus(response.hasResult() ?
+ response.getResult() : 0,
response.hasPlanID() ? response.getPlanID() : null,
response.hasStatus() ? response.getStatus() : null,
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
new file mode 100644
index 0000000..11730e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * Keeps track of how much work has finished.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DiskBalancerWorkItem {
+ private final long bytesToCopy;
+ private long bytesCopied;
+ private long errorCount;
+ private String errMsg;
+ private long blocksCopied;
+
+ /**
+ * Constructs a DiskBalancerWorkItem.
+ *
+ * @param bytesToCopy - Total bytes to copy from a disk
+ * @param bytesCopied - Copied So far.
+ */
+ public DiskBalancerWorkItem(long bytesToCopy, long bytesCopied) {
+ this.bytesToCopy = bytesToCopy;
+ this.bytesCopied = bytesCopied;
+ }
+
+ /**
+ * Reads a DiskBalancerWorkItem Object from a Json String.
+ *
+ * @param json - Json String.
+ * @return DiskBalancerWorkItem Object
+ * @throws IOException
+ */
+ public static DiskBalancerWorkItem parseJson(String json) throws IOException {
+ Preconditions.checkNotNull(json);
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(json, DiskBalancerWorkItem.class);
+ }
+
+ /**
+ * Gets the error message.
+ */
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ /**
+ * Sets the error message.
+ *
+ * @param errMsg - Msg.
+ */
+ public void setErrMsg(String errMsg) {
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * Returns the number of errors encountered.
+ *
+ * @return long
+ */
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ /**
+ * Incs Error Count.
+ */
+ public void incErrorCount() {
+ this.errorCount++;
+ }
+
+ /**
+ * Returns bytes copied so far.
+ *
+ * @return long
+ */
+ public long getBytesCopied() {
+ return bytesCopied;
+ }
+
+ /**
+ * Sets bytes copied so far.
+ *
+ * @param bytesCopied - long
+ */
+ public void setBytesCopied(long bytesCopied) {
+ this.bytesCopied = bytesCopied;
+ }
+
+ /**
+ * Increments bytesCopied by delta.
+ *
+ * @param delta - long
+ */
+ public void incCopiedSoFar(long delta) {
+ this.bytesCopied += delta;
+ }
+
+ /**
+ * Returns bytes to copy.
+ *
+ * @return - long
+ */
+ public long getBytesToCopy() {
+ return bytesToCopy;
+ }
+
+ /**
+ * Returns number of blocks copied for this DiskBalancerWorkItem.
+ *
+ * @return long count of blocks.
+ */
+ public long getBlocksCopied() {
+ return blocksCopied;
+ }
+
+ /**
+ * increments the number of blocks copied.
+ */
+ public void incBlocksCopied() {
+ blocksCopied++;
+ }
+
+ /**
+ * returns a serialized json string.
+ *
+ * @return String - json
+ * @throws IOException
+ */
+ public String toJson() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(this);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
new file mode 100644
index 0000000..6b29ce8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Helper class that reports how much work has has been done by the node.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DiskBalancerWorkStatus {
+ private final int result;
+ private final String planID;
+ private final String status;
+ private final String currentState;
+
+ /**
+ * Constructs a workStatus Object.
+ *
+ * @param result - int
+ * @param planID - Plan ID
+ * @param status - Current Status
+ * @param currentState - Current State
+ */
+ public DiskBalancerWorkStatus(int result, String planID, String status,
+ String currentState) {
+ this.result = result;
+ this.planID = planID;
+ this.status = status;
+ this.currentState = currentState;
+ }
+
+ /**
+ * Returns result.
+ *
+ * @return long
+ */
+ public int getResult() {
+ return result;
+ }
+
+ /**
+ * Returns planID.
+ *
+ * @return String
+ */
+ public String getPlanID() {
+ return planID;
+ }
+
+ /**
+ * Returns Status.
+ *
+ * @return String
+ */
+ public String getStatus() {
+ return status;
+ }
+
+ /**
+ * Gets current Status.
+ *
+ * @return - Json String
+ */
+ public String getCurrentState() {
+ return currentState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java
deleted file mode 100644
index 259a311..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Helper class that reports how much work has has been done by the node.
- */
-@InterfaceAudience.Private
-public class WorkStatus {
- private int result;
- private String planID;
- private String status;
- private String currentState;
-
- /**
- * Constructs a workStatus Object.
- *
- * @param result - int
- * @param planID - Plan ID
- * @param status - Current Status
- * @param currentState - Current State
- */
- public WorkStatus(int result, String planID, String status,
- String currentState) {
- this.result = result;
- this.planID = planID;
- this.status = status;
- this.currentState = currentState;
- }
-
- /**
- * Returns result.
- *
- * @return long
- */
- public int getResult() {
- return result;
- }
-
- /**
- * Returns planID.
- *
- * @return String
- */
- public String getPlanID() {
- return planID;
- }
-
- /**
- * Returns Status.
- *
- * @return String
- */
- public String getStatus() {
- return status;
- }
-
- /**
- * Gets current Status.
- *
- * @return - Json String
- */
- public String getCurrentState() {
- return currentState;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index d3bdedf..27de7d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -21,6 +21,9 @@ HDFS-1312 Change Log
HDFS-9645. DiskBalancer: Add Query RPC. (Anu Engineer via Arpit Agarwal)
- HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer
- via Arpit Agarwal)
+ HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via
+ Arpit Agarwal)
+
+ HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via
+ Arpit Agarwal)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f18a6c6..224ab3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -930,6 +930,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
+ // Disk Balancer Keys
+ public static final String DFS_DISK_BALANCER_ENABLED =
+ "dfs.disk.balancer.enabled";
+ public static final boolean DFS_DISK_BALANCER_ENABLED_DEFAULT = false;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index 3246633..692fca3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
/**
* Implementation for protobuf service that forwards requests
@@ -293,7 +293,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
RpcController controller, QueryPlanStatusRequestProto request)
throws ServiceException {
try {
- WorkStatus result = impl.queryDiskBalancerPlan();
+ DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan();
return QueryPlanStatusResponseProto
.newBuilder()
.setResult(result.getResult())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 836dc81..8d805a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -169,7 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
-import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -386,6 +386,8 @@ public class DataNode extends ReconfigurableBase
private static final int NUM_CORES = Runtime.getRuntime()
.availableProcessors();
private static final double CONGESTION_RATIO = 1.5;
+ private DiskBalancer diskBalancer;
+
private static Tracer createTracer(Configuration conf) {
return new Tracer.Builder("DataNode").
@@ -1022,7 +1024,33 @@ public class DataNode extends ReconfigurableBase
directoryScanner.shutdown();
}
}
-
+
+ /**
+ * Initilizes {@link DiskBalancer}.
+ * @param data - FSDataSet
+ * @param conf - Config
+ */
+ private synchronized void initDiskBalancer(FsDatasetSpi data,
+ Configuration conf) {
+ if (this.diskBalancer != null) {
+ return;
+ }
+
+ DiskBalancer.BlockMover mover = new DiskBalancer.DiskBalancerMover(data,
+ conf);
+ this.diskBalancer = new DiskBalancer(getDatanodeUuid(), conf, mover);
+ }
+
+ /**
+ * Shutdown disk balancer.
+ */
+ private synchronized void shutdownDiskBalancer() {
+ if (this.diskBalancer != null) {
+ this.diskBalancer.shutdown();
+ this.diskBalancer = null;
+ }
+ }
+
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
@@ -1530,6 +1558,7 @@ public class DataNode extends ReconfigurableBase
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
initDirectoryScanner(conf);
+ initDiskBalancer(data, conf);
}
List<BPOfferService> getAllBpOs() {
@@ -1867,6 +1896,7 @@ public class DataNode extends ReconfigurableBase
// Terminate directory scanner and block scanner
shutdownPeriodicScanners();
+ shutdownDiskBalancer();
// Stop the web server
if (httpServer != null) {
@@ -3296,31 +3326,30 @@ public class DataNode extends ReconfigurableBase
* @param bandwidth - Max disk bandwidth to use, 0 means use value defined
* in the configration.
* @param plan - Actual plan
- * @return success or throws an exception.
- * @throws Exception
+ * @throws IOException
*/
@Override
public void submitDiskBalancerPlan(String planID,
long planVersion, long bandwidth, String plan) throws IOException {
- // TODO : This will be replaced with actual code later.
- // Right now throwing DiskbalancerException instead
- // NotImplementedException to indicate the eventually disk balancer code
- // will throw DiskbalancerException.
- throw new DiskbalancerException("Not Implemented", 0);
+ checkSuperuserPrivilege();
+ // TODO : Support force option
+ this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
}
@Override
public void cancelDiskBalancePlan(String planID) throws
IOException {
checkSuperuserPrivilege();
- throw new DiskbalancerException("Not Implemented", 0);
+ throw new DiskBalancerException("Not Implemented",
+ DiskBalancerException.Result.INTERNAL_ERROR);
}
@Override
- public WorkStatus queryDiskBalancerPlan() throws IOException {
+ public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
checkSuperuserPrivilege();
- throw new DiskbalancerException("Not Implemented", 0);
+ throw new DiskBalancerException("Not Implemented",
+ DiskBalancerException.Result.INTERNAL_ERROR);
}
/**
@@ -3334,6 +3363,7 @@ public class DataNode extends ReconfigurableBase
@Override
public String getDiskBalancerSetting(String key) throws IOException {
checkSuperuserPrivilege();
- throw new DiskbalancerException("Not Implemented", 0);
+ throw new DiskBalancerException("Not Implemented",
+ DiskBalancerException.Result.INTERNAL_ERROR);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
new file mode 100644
index 0000000..1c8ba4cf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
+import org.apache.hadoop.util.Time;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Worker class for Disk Balancer.
+ * <p/>
+ * Here is the high level logic executed by this class. Users can submit disk
+ * balancing plans using submitPlan calls. After a set of sanity checks the plan
+ * is admitted and put into workMap.
+ * <p/>
+ * The executePlan launches a thread that picks up work from workMap and hands
+ * it over to the BlockMover#copyBlocks function.
+ * <p/>
+ * Constraints :
+ * <p/>
+ * Only one plan can be executing in a datanode at any given time. This is
+ * ensured by checking the future handle of the worker thread in submitPlan.
+ */
+@InterfaceAudience.Private
+public class DiskBalancer {
+
+ private static final Log LOG = LogFactory.getLog(DiskBalancer.class);
+ private final FsDatasetSpi<?> dataset;
+ private final String dataNodeUUID;
+ private final BlockMover blockMover;
+ private final ReentrantLock lock;
+ private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
+ private boolean isDiskBalancerEnabled = false;
+ private ExecutorService scheduler;
+ private Future future;
+ private String planID;
+
+ /**
+ * Constructs a Disk Balancer object. This object takes care of reading a
+ * NodePlan and executing it against a set of volumes.
+ *
+ * @param dataNodeUUID - Data node UUID
+ * @param conf - Hdfs Config
+ * @param blockMover - Object that supports moving blocks.
+ */
+ public DiskBalancer(String dataNodeUUID,
+ Configuration conf, BlockMover blockMover) {
+ this.blockMover = blockMover;
+ this.dataset = this.blockMover.getDataset();
+ this.dataNodeUUID = dataNodeUUID;
+ scheduler = Executors.newSingleThreadExecutor();
+ lock = new ReentrantLock();
+ workMap = new ConcurrentHashMap<>();
+ this.isDiskBalancerEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
+ DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
+ }
+
+ /**
+ * Shutdown disk balancer services.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ this.isDiskBalancerEnabled = false;
+ if ((this.future != null) && (!this.future.isDone())) {
+ this.blockMover.setExitFlag();
+ shutdownExecutor();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Shutdown the executor.
+ */
+ private void shutdownExecutor() {
+ scheduler.shutdown();
+ try {
+ if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.error("Disk Balancer : Scheduler did not terminate.");
+ }
+ }
+ } catch (InterruptedException ex) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Takes a client submitted plan and converts into a set of work items that
+ * can be executed by the blockMover.
+ *
+ * @param planID - A SHA512 of the plan string
+ * @param planVersion - version of the plan string - for future use.
+ * @param plan - Actual Plan
+ * @param bandwidth - BytesPerSec to copy
+ * @param force - Skip some validations and execute the plan file.
+ * @throws DiskBalancerException
+ */
+ public void submitPlan(String planID, long planVersion, String plan,
+ long bandwidth, boolean force)
+ throws DiskBalancerException {
+
+ lock.lock();
+ try {
+ checkDiskBalancerEnabled();
+ if ((this.future != null) && (!this.future.isDone())) {
+ LOG.error("Disk Balancer - Executing another plan, submitPlan failed.");
+ throw new DiskBalancerException("Executing another plan",
+ DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
+ }
+ NodePlan nodePlan =
+ verifyPlan(planID, planVersion, plan, bandwidth, force);
+ createWorkPlan(nodePlan);
+ this.planID = planID;
+ executePlan();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws if Disk balancer is disabled.
+ *
+ * @throws DiskBalancerException
+ */
+ private void checkDiskBalancerEnabled()
+ throws DiskBalancerException {
+ if (!isDiskBalancerEnabled) {
+ LOG.error("Disk Balancer is not enabled.");
+ throw new DiskBalancerException("Disk Balancer is not enabled.",
+ DiskBalancerException.Result.DISK_BALANCER_NOT_ENABLED);
+ }
+ }
+
+ /**
+ * Verifies that user provided plan is valid.
+ *
+ * @param planID - SHA 512 of the plan.
+ * @param planVersion - Version of the plan, for future use.
+ * @param plan - Plan String in Json.
+ * @param bandwidth - Max disk bandwidth to use per second.
+ * @param force - Skip verifying when the plan was generated.
+ * @return a NodePlan Object.
+ * @throws DiskBalancerException
+ */
+ private NodePlan verifyPlan(String planID, long planVersion, String plan,
+ long bandwidth, boolean force)
+ throws DiskBalancerException {
+
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ verifyPlanVersion(planVersion);
+ NodePlan nodePlan = verifyPlanHash(planID, plan);
+ if (!force) {
+ verifyTimeStamp(nodePlan);
+ }
+ verifyNodeUUID(nodePlan);
+ return nodePlan;
+ }
+
+ /**
+ * Verifies the plan version is something that we support.
+ *
+ * @param planVersion - Long version.
+ * @throws DiskBalancerException
+ */
+ private void verifyPlanVersion(long planVersion)
+ throws DiskBalancerException {
+ if ((planVersion < DiskBalancerConstants.DISKBALANCER_MIN_VERSION) ||
+ (planVersion > DiskBalancerConstants.DISKBALANCER_MAX_VERSION)) {
+ LOG.error("Disk Balancer - Invalid plan version.");
+ throw new DiskBalancerException("Invalid plan version.",
+ DiskBalancerException.Result.INVALID_PLAN_VERSION);
+ }
+ }
+
+ /**
+ * Verifies that plan matches the SHA512 provided by the client.
+ *
+ * @param planID - Sha512 Hex Bytes
+ * @param plan - Plan String
+ * @throws DiskBalancerException
+ */
+ private NodePlan verifyPlanHash(String planID, String plan)
+ throws DiskBalancerException {
+ final long sha512Length = 128;
+ if (plan == null || plan.length() == 0) {
+ LOG.error("Disk Balancer - Invalid plan.");
+ throw new DiskBalancerException("Invalid plan.",
+ DiskBalancerException.Result.INVALID_PLAN);
+ }
+
+ if ((planID == null) ||
+ (planID.length() != sha512Length) ||
+ !DigestUtils.sha512Hex(plan.getBytes(Charset.forName("UTF-8")))
+ .equalsIgnoreCase(planID)) {
+ LOG.error("Disk Balancer - Invalid plan hash.");
+ throw new DiskBalancerException("Invalid or mis-matched hash.",
+ DiskBalancerException.Result.INVALID_PLAN_HASH);
+ }
+
+ try {
+ return NodePlan.parseJson(plan);
+ } catch (IOException ex) {
+ throw new DiskBalancerException("Parsing plan failed.", ex,
+ DiskBalancerException.Result.MALFORMED_PLAN);
+ }
+ }
+
+ /**
+ * Verifies that this plan is not older than 24 hours.
+ *
+ * @param plan - Node Plan
+ */
+ private void verifyTimeStamp(NodePlan plan) throws DiskBalancerException {
+ long now = Time.now();
+ long planTime = plan.getTimeStamp();
+
+ // TODO : Support Valid Plan hours as a user configurable option.
+ if ((planTime +
+ (TimeUnit.HOURS.toMillis(
+ DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
+ String hourString = "Plan was generated more than " +
+ Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
+ + " hours ago.";
+ LOG.error("Disk Balancer - " + hourString);
+ throw new DiskBalancerException(hourString,
+ DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
+ }
+ }
+
+ /**
+ * Verify Node UUID.
+ *
+ * @param plan - Node Plan
+ */
+ private void verifyNodeUUID(NodePlan plan) throws DiskBalancerException {
+ if ((plan.getNodeUUID() == null) ||
+ !plan.getNodeUUID().equals(this.dataNodeUUID)) {
+ LOG.error("Disk Balancer - Plan was generated for another node.");
+ throw new DiskBalancerException(
+ "Plan was generated for another node.",
+ DiskBalancerException.Result.DATANODE_ID_MISMATCH);
+ }
+ }
+
+ /**
+ * Convert a node plan to DiskBalancerWorkItem that Datanode can execute.
+ *
+ * @param plan - Node Plan
+ */
+ private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+
+ // Cleanup any residual work in the map.
+ workMap.clear();
+ Map<String, FsVolumeSpi> pathMap = getStorageIDToVolumeMap();
+
+ for (Step step : plan.getVolumeSetPlans()) {
+ String sourceuuid = step.getSourceVolume().getUuid();
+ String destinationuuid = step.getDestinationVolume().getUuid();
+
+ FsVolumeSpi sourceVol = pathMap.get(sourceuuid);
+ if (sourceVol == null) {
+ LOG.error("Disk Balancer - Unable to find source volume. submitPlan " +
+ "failed.");
+ throw new DiskBalancerException("Unable to find source volume.",
+ DiskBalancerException.Result.INVALID_VOLUME);
+ }
+
+ FsVolumeSpi destVol = pathMap.get(destinationuuid);
+ if (destVol == null) {
+ LOG.error("Disk Balancer - Unable to find destination volume. " +
+ "submitPlan failed.");
+ throw new DiskBalancerException("Unable to find destination volume.",
+ DiskBalancerException.Result.INVALID_VOLUME);
+ }
+ createWorkPlan(sourceVol, destVol, step.getBytesToMove());
+ }
+ }
+
+ /**
+ * Returns a path to Volume Map.
+ *
+ * @return Map
+ * @throws DiskBalancerException
+ */
+ private Map<String, FsVolumeSpi> getStorageIDToVolumeMap()
+ throws DiskBalancerException {
+ Map<String, FsVolumeSpi> pathMap = new HashMap<>();
+ FsDatasetSpi.FsVolumeReferences references;
+ try {
+ synchronized (this.dataset) {
+ references = this.dataset.getFsVolumeReferences();
+ for (int ndx = 0; ndx < references.size(); ndx++) {
+ FsVolumeSpi vol = references.get(ndx);
+ pathMap.put(vol.getStorageID(), vol);
+ }
+ references.close();
+ }
+ } catch (IOException ex) {
+ LOG.error("Disk Balancer - Internal Error.", ex);
+ throw new DiskBalancerException("Internal error", ex,
+ DiskBalancerException.Result.INTERNAL_ERROR);
+ }
+ return pathMap;
+ }
+
+ /**
+ * Starts Executing the plan, exits when the plan is done executing.
+ */
+ private void executePlan() {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ this.blockMover.setRunnable();
+ if (this.scheduler.isShutdown()) {
+ this.scheduler = Executors.newSingleThreadExecutor();
+ }
+
+ this.future = scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ Thread.currentThread().setName("DiskBalancerThread");
+ LOG.info("Executing Disk balancer plan. Plan ID - " + planID);
+
+ for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
+ workMap.entrySet()) {
+ blockMover.copyBlocks(entry.getKey(), entry.getValue());
+ }
+ }
+ });
+ }
+
+ /**
+ * Insert work items to work map.
+ *
+ * @param source - Source vol
+ * @param dest - destination volume
+ * @param bytesToMove - number of bytes to move
+ */
+ private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
+ long bytesToMove) throws DiskBalancerException {
+
+ if(source.getStorageID().equals(dest.getStorageID())) {
+ throw new DiskBalancerException("Same source and destination",
+ DiskBalancerException.Result.INVALID_MOVE);
+ }
+ VolumePair pair = new VolumePair(source, dest);
+
+ // In case we have a plan with more than
+ // one line of same <source, dest>
+ // we compress that into one work order.
+ if (workMap.containsKey(pair)) {
+ bytesToMove += workMap.get(pair).getBytesToCopy();
+ }
+
+ DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
+ workMap.put(pair, work);
+ }
+
+ /**
+ * BlockMover supports moving blocks across Volumes.
+ */
+ public interface BlockMover {
+ /**
+ * Copies blocks from a set of volumes.
+ *
+ * @param pair - Source and Destination Volumes.
+ * @param item - Number of bytes to move from volumes.
+ */
+ void copyBlocks(VolumePair pair, DiskBalancerWorkItem item);
+
+ /**
+ * Begin the actual copy operations. This is useful in testing.
+ */
+ void setRunnable();
+
+ /**
+ * Tells copyBlocks to exit from the copy routine.
+ */
+ void setExitFlag();
+
+ /**
+ * Returns a pointer to the current dataset we are operating against.
+ *
+ * @return FsDatasetSpi
+ */
+ FsDatasetSpi getDataset();
+ }
+
+ /**
+ * Holds references to actual volumes that we will be operating against.
+ */
+ static class VolumePair {
+ private final FsVolumeSpi source;
+ private final FsVolumeSpi dest;
+
+ /**
+ * Constructs a volume pair.
+ *
+ * @param source - Source Volume
+ * @param dest - Destination Volume
+ */
+ public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) {
+ this.source = source;
+ this.dest = dest;
+ }
+
+ /**
+ * gets source volume.
+ *
+ * @return volume
+ */
+ public FsVolumeSpi getSource() {
+ return source;
+ }
+
+ /**
+ * Gets Destination volume.
+ *
+ * @return volume.
+ */
+ public FsVolumeSpi getDest() {
+ return dest;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VolumePair that = (VolumePair) o;
+ return source.equals(that.source) && dest.equals(that.dest);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = source.getBasePath().hashCode();
+ result = 31 * result + dest.getBasePath().hashCode();
+ return result;
+ }
+ }
+
+ /**
+ * Actual DataMover class for DiskBalancer.
+ * <p/>
+ * TODO : Add implementation for this class. This is here as a place holder so
+ * that Datanode can make calls into this class.
+ */
+ public static class DiskBalancerMover implements BlockMover {
+ private final FsDatasetSpi dataset;
+
+ /**
+ * Constructs diskBalancerMover.
+ *
+ * @param dataset Dataset
+ * @param conf Configuration
+ */
+ public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
+ this.dataset = dataset;
+ // TODO : Read Config values.
+ }
+
+ /**
+ * Copies blocks from a set of volumes.
+ *
+ * @param pair - Source and Destination Volumes.
+ * @param item - Number of bytes to move from volumes.
+ */
+ @Override
+ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
+
+ }
+
+ /**
+ * Begin the actual copy operations. This is useful in testing.
+ */
+ @Override
+ public void setRunnable() {
+
+ }
+
+ /**
+ * Tells copyBlocks to exit from the copy routine.
+ */
+ @Override
+ public void setExitFlag() {
+
+ }
+
+ /**
+ * Returns a pointer to the current dataset we are operating against.
+ *
+ * @return FsDatasetSpi
+ */
+ @Override
+ public FsDatasetSpi getDataset() {
+ return dataset;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
index 553827e..7144a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
@@ -29,6 +29,15 @@ public final class DiskBalancerConstants {
public static final String DISKBALANCER_VOLUME_NAME =
"DiskBalancerVolumeName";
+ /** Min and Max Plan file versions that we know of. **/
+ public static final int DISKBALANCER_MIN_VERSION = 1;
+ public static final int DISKBALANCER_MAX_VERSION = 1;
+
+ /**
+ * We treat a plan as stale if it was generated before the hours
+ * defined by the constant below. Defaults to 24 hours.
+ */
+ public static final int DISKBALANCER_VALID_PLAN_HOURS = 24;
// never constructed.
private DiskBalancerConstants() {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
new file mode 100644
index 0000000..a5e1581
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import java.io.IOException;
+
+/**
+ * Disk Balancer Exceptions.
+ */
+public class DiskBalancerException extends IOException {
+ /** Possible results from DiskBalancer. **/
+ public enum Result {
+ DISK_BALANCER_NOT_ENABLED,
+ INVALID_PLAN_VERSION,
+ INVALID_PLAN,
+ INVALID_PLAN_HASH,
+ OLD_PLAN_SUBMITTED,
+ DATANODE_ID_MISMATCH,
+ MALFORMED_PLAN,
+ PLAN_ALREADY_IN_PROGRESS,
+ INVALID_VOLUME,
+ INVALID_MOVE,
+ INTERNAL_ERROR
+ }
+
+ private final Result result;
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the {@link #getMessage()} method)
+ */
+ public DiskBalancerException(String message, Result result) {
+ super(message);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message and
+ * cause.
+ * <p/>
+ * <p> Note that the detail message associated with {@code cause} is
+ * <i>not</i>
+ * automatically incorporated into this exception's detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and
+ * indicates that the cause is nonexistent or unknown.)
+ */
+ public DiskBalancerException(String message, Throwable cause, Result result) {
+ super(message, cause);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified cause and a detail
+ * message of {@code (cause==null ? null : cause.toString())} (which typically
+ * contains the class and detail message of {@code cause}). This
+ * constructor is useful for IO exceptions that are little more than
+ * wrappers for other throwables.
+ *
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and
+ * indicates
+ * that the cause is nonexistent or unknown.)
+ */
+ public DiskBalancerException(Throwable cause, Result result) {
+ super(cause);
+ this.result = result;
+ }
+
+ /**
+ * Returns the result.
+ * @return int
+ */
+ public Result getResult() {
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
deleted file mode 100644
index 9d47dc3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdfs.server.diskbalancer;
-
-import java.io.IOException;
-
-/**
- * Disk Balancer Exceptions.
- */
-public class DiskbalancerException extends IOException {
- private int result;
-
- /**
- * Constructs an {@code IOException} with the specified detail message.
- *
- * @param message The detail message (which is saved for later retrieval by
- * the
- * {@link #getMessage()} method)
- */
- public DiskbalancerException(String message, int result) {
- super(message);
- this.result = result;
- }
-
- /**
- * Constructs an {@code IOException} with the specified detail message and
- * cause.
- * <p/>
- * <p> Note that the detail message associated with {@code cause} is
- * <i>not</i>
- * automatically incorporated into this exception's detail message.
- *
- * @param message The detail message (which is saved for later retrieval by
- * the
- * {@link #getMessage()} method)
- * @param cause The cause (which is saved for later retrieval by the {@link
- * #getCause()} method). (A null value is permitted, and
- * indicates that the cause is nonexistent or unknown.)
- * @since 1.6
- */
- public DiskbalancerException(String message, Throwable cause, int result) {
- super(message, cause);
- this.result = result;
- }
-
- /**
- * Constructs an {@code IOException} with the specified cause and a detail
- * message of {@code (cause==null ? null : cause.toString())} (which typically
- * contains the class and detail message of {@code cause}). This
- * constructor is
- * useful for IO exceptions that are little more than wrappers for other
- * throwables.
- *
- * @param cause The cause (which is saved for later retrieval by the {@link
- * #getCause()} method). (A null value is permitted, and
- * indicates
- * that the cause is nonexistent or unknown.)
- * @since 1.6
- */
- public DiskbalancerException(Throwable cause, int result) {
- super(cause);
- this.result = result;
- }
-
- /**
- * Returns the result.
- * @return int
- */
- public int getResult() {
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
index af9e9af..c86fc9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
@@ -358,4 +358,18 @@ public class DiskBalancerCluster {
return (10 - modValue) + threadRatio;
}
}
+
+ /**
+ * Returns a node by UUID.
+ * @param uuid - Node's UUID
+ * @return DiskBalancerDataNode.
+ */
+ public DiskBalancerDataNode getNodeByUUID(String uuid) {
+ for(DiskBalancerDataNode node : this.getNodes()) {
+ if(node.getDataNodeUUID().equals(uuid)) {
+ return node;
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 143b776..dc24787 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.diskbalancer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -35,9 +36,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.io.IOException;
-import java.net.URI;
-
public class TestDiskBalancerRPC {
@Rule
public ExpectedException thrown = ExpectedException.none();
@@ -48,6 +46,7 @@ public class TestDiskBalancerRPC {
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
}
@@ -72,22 +71,19 @@ public class TestDiskBalancerRPC {
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
- DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(dnIndex);
+
+ DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+ DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
+ dataNode.getDatanodeUuid());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
- final int planVersion = 0; // So far we support only one version.
- DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+ final int planVersion = 1; // So far we support only one version.
String planHash = DigestUtils.sha512Hex(plan.toJson());
- // Since submitDiskBalancerPlan is not implemented yet, it throws an
- // Exception, this will be modified with the actual implementation.
- thrown.expect(DiskbalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
-
-
}
@Test
@@ -117,10 +113,10 @@ public class TestDiskBalancerRPC {
// Exception, this will be modified with the actual implementation.
try {
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
- } catch (DiskbalancerException ex) {
+ } catch (DiskBalancerException ex) {
// Let us ignore this for time being.
}
- thrown.expect(DiskbalancerException.class);
+ thrown.expect(DiskBalancerException.class);
dataNode.cancelDiskBalancePlan(planHash);
}
@@ -152,13 +148,13 @@ public class TestDiskBalancerRPC {
// Exception, this will be modified with the actual implementation.
try {
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
- } catch (DiskbalancerException ex) {
+ } catch (DiskBalancerException ex) {
// Let us ignore this for time being.
}
// TODO : This will be fixed when we have implementation for this
// function in server side.
- thrown.expect(DiskbalancerException.class);
+ thrown.expect(DiskBalancerException.class);
dataNode.queryDiskBalancerPlan();
}
@@ -166,7 +162,7 @@ public class TestDiskBalancerRPC {
public void testgetDiskBalancerSetting() throws Exception {
final int dnIndex = 0;
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
- thrown.expect(DiskbalancerException.class);
+ thrown.expect(DiskBalancerException.class);
dataNode.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org