You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 01:36:15 UTC

[19/49] hadoop git commit: HDFS-9671. DiskBalancer: SubmitPlan implementation. (Contributed by Anu Engineer)

HDFS-9671. DiskBalancer: SubmitPlan implementation. (Contributed by Anu Engineer)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b1b2faf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b1b2faf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b1b2faf

Branch: refs/heads/HDFS-1312
Commit: 2b1b2faf76a7ff148650a7836935a85439f60c49
Parents: 66f0bb6
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Feb 22 11:45:51 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:18:48 2016 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/ClientDatanodeProtocol.java   |   4 +-
 .../ClientDatanodeProtocolTranslatorPB.java     |  10 +-
 .../server/datanode/DiskBalancerWorkItem.java   | 160 ++++++
 .../server/datanode/DiskBalancerWorkStatus.java |  87 +++
 .../hadoop/hdfs/server/datanode/WorkStatus.java |  85 ---
 .../hadoop-hdfs/HDFS-1312_CHANGES.txt           |   7 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 ...tDatanodeProtocolServerSideTranslatorPB.java |   4 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  56 +-
 .../hdfs/server/datanode/DiskBalancer.java      | 542 +++++++++++++++++++
 .../diskbalancer/DiskBalancerConstants.java     |   9 +
 .../diskbalancer/DiskBalancerException.java     |  98 ++++
 .../diskbalancer/DiskbalancerException.java     |  86 ---
 .../datamodel/DiskBalancerCluster.java          |  14 +
 .../diskbalancer/TestDiskBalancerRPC.java       |  28 +-
 15 files changed, 984 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index dede89e..d8df7fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
 
 /** An client-datanode protocol for block recovery
  */
@@ -182,7 +182,7 @@ public interface ClientDatanodeProtocol {
   /**
    * Gets the status of an executing diskbalancer Plan.
    */
-  WorkStatus queryDiskBalancerPlan() throws IOException;
+  DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException;
 
   /**
    * Gets a run-time configuration value from running diskbalancer instance.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index e7e0d94..786d834 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryP
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
@@ -345,8 +345,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
    *                  to zero allows datanode to use the value defined in
    *                  configration.
    * @param plan - Actual plan.
-   * @return Success or throws Exception.
-   * @throws Exception
+   * @throws IOException
    */
   @Override
   public void submitDiskBalancerPlan(String planID, long planVersion,
@@ -387,13 +386,14 @@ public class ClientDatanodeProtocolTranslatorPB implements
    * Gets the status of an executing diskbalancer Plan.
    */
   @Override
-  public WorkStatus queryDiskBalancerPlan() throws IOException {
+  public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
     try {
       QueryPlanStatusRequestProto request =
           QueryPlanStatusRequestProto.newBuilder().build();
       QueryPlanStatusResponseProto response =
           rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
-      return new WorkStatus(response.hasResult() ? response.getResult() : 0,
+      return new DiskBalancerWorkStatus(response.hasResult() ?
+          response.getResult() : 0,
           response.hasPlanID() ? response.getPlanID() : null,
           response.hasStatus() ? response.getStatus() : null,
           response.hasCurrentStatus() ? response.getCurrentStatus() : null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
new file mode 100644
index 0000000..11730e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * Keeps track of how much work has finished.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DiskBalancerWorkItem {
+  private final long bytesToCopy;
+  private long bytesCopied;
+  private long errorCount;
+  private String errMsg;
+  private long blocksCopied;
+
+  /**
+   * Constructs a DiskBalancerWorkItem.
+   *
+   * @param bytesToCopy - Total bytes to copy from a disk
+   * @param bytesCopied - Copied So far.
+   */
+  public DiskBalancerWorkItem(long bytesToCopy, long bytesCopied) {
+    this.bytesToCopy = bytesToCopy;
+    this.bytesCopied = bytesCopied;
+  }
+
+  /**
+   * Reads a DiskBalancerWorkItem Object from a Json String.
+   *
+   * @param json - Json String.
+   * @return DiskBalancerWorkItem Object
+   * @throws IOException
+   */
+  public static DiskBalancerWorkItem parseJson(String json) throws IOException {
+    Preconditions.checkNotNull(json);
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.readValue(json, DiskBalancerWorkItem.class);
+  }
+
+  /**
+   * Gets the error message.
+   */
+  public String getErrMsg() {
+    return errMsg;
+  }
+
+  /**
+   * Sets the error message.
+   *
+   * @param errMsg - Msg.
+   */
+  public void setErrMsg(String errMsg) {
+    this.errMsg = errMsg;
+  }
+
+  /**
+   * Returns the number of errors encountered.
+   *
+   * @return long
+   */
+  public long getErrorCount() {
+    return errorCount;
+  }
+
+  /**
+   * Incs Error Count.
+   */
+  public void incErrorCount() {
+    this.errorCount++;
+  }
+
+  /**
+   * Returns bytes copied so far.
+   *
+   * @return long
+   */
+  public long getBytesCopied() {
+    return bytesCopied;
+  }
+
+  /**
+   * Sets bytes copied so far.
+   *
+   * @param bytesCopied - long
+   */
+  public void setBytesCopied(long bytesCopied) {
+    this.bytesCopied = bytesCopied;
+  }
+
+  /**
+   * Increments bytesCopied by delta.
+   *
+   * @param delta - long
+   */
+  public void incCopiedSoFar(long delta) {
+    this.bytesCopied += delta;
+  }
+
+  /**
+   * Returns bytes to copy.
+   *
+   * @return - long
+   */
+  public long getBytesToCopy() {
+    return bytesToCopy;
+  }
+
+  /**
+   * Returns number of blocks copied for this DiskBalancerWorkItem.
+   *
+   * @return long count of blocks.
+   */
+  public long getBlocksCopied() {
+    return blocksCopied;
+  }
+
+  /**
+   * increments the number of blocks copied.
+   */
+  public void incBlocksCopied() {
+    blocksCopied++;
+  }
+
+  /**
+   * returns a serialized json string.
+   *
+   * @return String - json
+   * @throws IOException
+   */
+  public String toJson() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.writeValueAsString(this);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
new file mode 100644
index 0000000..6b29ce8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Helper class that reports how much work has has been done by the node.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DiskBalancerWorkStatus {
+  private final int result;
+  private final String planID;
+  private final String status;
+  private final String currentState;
+
+  /**
+   * Constructs a workStatus Object.
+   *
+   * @param result       - int
+   * @param planID       - Plan ID
+   * @param status       - Current Status
+   * @param currentState - Current State
+   */
+  public DiskBalancerWorkStatus(int result, String planID, String status,
+                                String currentState) {
+    this.result = result;
+    this.planID = planID;
+    this.status = status;
+    this.currentState = currentState;
+  }
+
+  /**
+   * Returns result.
+   *
+   * @return long
+   */
+  public int getResult() {
+    return result;
+  }
+
+  /**
+   * Returns planID.
+   *
+   * @return String
+   */
+  public String getPlanID() {
+    return planID;
+  }
+
+  /**
+   * Returns Status.
+   *
+   * @return String
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  /**
+   * Gets current Status.
+   *
+   * @return - Json String
+   */
+  public String getCurrentState() {
+    return currentState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java
deleted file mode 100644
index 259a311..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Helper class that reports how much work has has been done by the node.
- */
-@InterfaceAudience.Private
-public class WorkStatus {
-  private int result;
-  private String planID;
-  private String status;
-  private String currentState;
-
-  /**
-   * Constructs a workStatus Object.
-   *
-   * @param result       - int
-   * @param planID       - Plan ID
-   * @param status       - Current Status
-   * @param currentState - Current State
-   */
-  public WorkStatus(int result, String planID, String status,
-                    String currentState) {
-    this.result = result;
-    this.planID = planID;
-    this.status = status;
-    this.currentState = currentState;
-  }
-
-  /**
-   * Returns result.
-   *
-   * @return long
-   */
-  public int getResult() {
-    return result;
-  }
-
-  /**
-   * Returns planID.
-   *
-   * @return String
-   */
-  public String getPlanID() {
-    return planID;
-  }
-
-  /**
-   * Returns Status.
-   *
-   * @return String
-   */
-  public String getStatus() {
-    return status;
-  }
-
-  /**
-   * Gets current Status.
-   *
-   * @return - Json String
-   */
-  public String getCurrentState() {
-    return currentState;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index d3bdedf..27de7d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -21,6 +21,9 @@ HDFS-1312 Change Log
 
     HDFS-9645. DiskBalancer: Add Query RPC. (Anu Engineer via Arpit Agarwal)
 
-    HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer
-    via Arpit Agarwal)
+    HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via
+    Arpit Agarwal)
+
+    HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via
+    Arpit Agarwal)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f18a6c6..224ab3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -930,6 +930,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
       HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
 
+  // Disk Balancer Keys
+  public static final String DFS_DISK_BALANCER_ENABLED =
+      "dfs.disk.balancer.enabled";
+  public static final boolean DFS_DISK_BALANCER_ENABLED_DEFAULT = false;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index 3246633..692fca3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
 
 /**
  * Implementation for protobuf service that forwards requests
@@ -293,7 +293,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       RpcController controller, QueryPlanStatusRequestProto request)
       throws ServiceException {
     try {
-      WorkStatus result = impl.queryDiskBalancerPlan();
+      DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan();
       return QueryPlanStatusResponseProto
           .newBuilder()
           .setResult(result.getResult())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 836dc81..8d805a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -169,7 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
-import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -386,6 +386,8 @@ public class DataNode extends ReconfigurableBase
   private static final int NUM_CORES = Runtime.getRuntime()
       .availableProcessors();
   private static final double CONGESTION_RATIO = 1.5;
+  private DiskBalancer diskBalancer;
+
 
   private static Tracer createTracer(Configuration conf) {
     return new Tracer.Builder("DataNode").
@@ -1022,7 +1024,33 @@ public class DataNode extends ReconfigurableBase
       directoryScanner.shutdown();
     }
   }
-  
+
+  /**
+   * Initilizes {@link DiskBalancer}.
+   * @param  data - FSDataSet
+   * @param conf - Config
+   */
+  private synchronized void initDiskBalancer(FsDatasetSpi data,
+                                             Configuration conf) {
+    if (this.diskBalancer != null) {
+      return;
+    }
+
+    DiskBalancer.BlockMover mover = new DiskBalancer.DiskBalancerMover(data,
+        conf);
+    this.diskBalancer = new DiskBalancer(getDatanodeUuid(), conf, mover);
+  }
+
+  /**
+   * Shutdown disk balancer.
+   */
+  private synchronized void shutdownDiskBalancer() {
+    if (this.diskBalancer != null) {
+      this.diskBalancer.shutdown();
+      this.diskBalancer = null;
+    }
+  }
+
   private void initDataXceiver(Configuration conf) throws IOException {
     // find free port or use privileged port provided
     TcpPeerServer tcpPeerServer;
@@ -1530,6 +1558,7 @@ public class DataNode extends ReconfigurableBase
     data.addBlockPool(nsInfo.getBlockPoolID(), conf);
     blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
     initDirectoryScanner(conf);
+    initDiskBalancer(data, conf);
   }
 
   List<BPOfferService> getAllBpOs() {
@@ -1867,6 +1896,7 @@ public class DataNode extends ReconfigurableBase
 
     // Terminate directory scanner and block scanner
     shutdownPeriodicScanners();
+    shutdownDiskBalancer();
 
     // Stop the web server
     if (httpServer != null) {
@@ -3296,31 +3326,30 @@ public class DataNode extends ReconfigurableBase
    * @param bandwidth - Max disk bandwidth to use, 0 means use value defined
    *                  in the configration.
    * @param plan - Actual plan
-   * @return  success or throws an exception.
-   * @throws Exception
+   * @throws IOException
    */
   @Override
   public void submitDiskBalancerPlan(String planID,
       long planVersion, long bandwidth, String plan) throws IOException {
 
-    // TODO : This will be replaced with actual code later.
-    // Right now throwing DiskbalancerException instead
-    // NotImplementedException to indicate the eventually disk balancer code
-    // will throw DiskbalancerException.
-    throw new DiskbalancerException("Not Implemented", 0);
+    checkSuperuserPrivilege();
+    // TODO : Support force option
+    this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
   }
 
   @Override
   public void cancelDiskBalancePlan(String planID) throws
       IOException {
     checkSuperuserPrivilege();
-    throw new DiskbalancerException("Not Implemented", 0);
+    throw new DiskBalancerException("Not Implemented",
+        DiskBalancerException.Result.INTERNAL_ERROR);
   }
 
   @Override
-  public WorkStatus queryDiskBalancerPlan() throws IOException {
+  public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
     checkSuperuserPrivilege();
-    throw new DiskbalancerException("Not Implemented", 0);
+    throw new DiskBalancerException("Not Implemented",
+        DiskBalancerException.Result.INTERNAL_ERROR);
   }
 
   /**
@@ -3334,6 +3363,7 @@ public class DataNode extends ReconfigurableBase
   @Override
   public String getDiskBalancerSetting(String key) throws IOException {
     checkSuperuserPrivilege();
-    throw new DiskbalancerException("Not Implemented", 0);
+    throw new DiskBalancerException("Not Implemented",
+        DiskBalancerException.Result.INTERNAL_ERROR);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
new file mode 100644
index 0000000..1c8ba4cf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
+import org.apache.hadoop.util.Time;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Worker class for Disk Balancer.
+ * <p/>
+ * Here is the high level logic executed by this class. Users can submit disk
+ * balancing plans using submitPlan calls. After a set of sanity checks the plan
+ * is admitted and put into workMap.
+ * <p/>
+ * The executePlan launches a thread that picks up work from workMap and hands
+ * it over to the BlockMover#copyBlocks function.
+ * <p/>
+ * Constraints :
+ * <p/>
+ * Only one plan can be executing in a datanode at any given time. This is
+ * ensured by checking the future handle of the worker thread in submitPlan.
+ */
+@InterfaceAudience.Private
+public class DiskBalancer {
+
+  private static final Log LOG = LogFactory.getLog(DiskBalancer.class);
+  private final FsDatasetSpi<?> dataset;
+  private final String dataNodeUUID;
+  private final BlockMover blockMover;
+  private final ReentrantLock lock;
+  private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
+  private boolean isDiskBalancerEnabled = false;
+  private ExecutorService scheduler;
+  private Future future;
+  private String planID;
+
+  /**
+   * Constructs a Disk Balancer object. This object takes care of reading a
+   * NodePlan and executing it against a set of volumes.
+   *
+   * @param dataNodeUUID - Data node UUID
+   * @param conf         - Hdfs Config
+   * @param blockMover   - Object that supports moving blocks.
+   */
+  public DiskBalancer(String dataNodeUUID,
+                      Configuration conf, BlockMover blockMover) {
+    this.blockMover = blockMover;
+    this.dataset = this.blockMover.getDataset();
+    this.dataNodeUUID = dataNodeUUID;
+    scheduler = Executors.newSingleThreadExecutor();
+    lock = new ReentrantLock();
+    workMap = new ConcurrentHashMap<>();
+    this.isDiskBalancerEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
+        DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Shutdown  disk balancer services.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      this.isDiskBalancerEnabled = false;
+      if ((this.future != null) && (!this.future.isDone())) {
+        this.blockMover.setExitFlag();
+        shutdownExecutor();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Shutdown the executor.
+   */
+  private void shutdownExecutor() {
+    scheduler.shutdown();
+    try {
+      if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+        scheduler.shutdownNow();
+        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+          LOG.error("Disk Balancer : Scheduler did not terminate.");
+        }
+      }
+    } catch (InterruptedException ex) {
+      scheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Takes a client submitted plan and converts into a set of work items that
+   * can be executed by the blockMover.
+   *
+   * @param planID      - A SHA512 of the plan string
+   * @param planVersion - version of the plan string - for future use.
+   * @param plan        - Actual Plan
+   * @param bandwidth   - BytesPerSec to copy
+   * @param force       - Skip some validations and execute the plan file.
+   * @throws DiskBalancerException
+   */
+  public void submitPlan(String planID, long planVersion, String plan,
+                         long bandwidth, boolean force)
+      throws DiskBalancerException {
+
+    lock.lock();
+    try {
+      checkDiskBalancerEnabled();
+      if ((this.future != null) && (!this.future.isDone())) {
+        LOG.error("Disk Balancer - Executing another plan, submitPlan failed.");
+        throw new DiskBalancerException("Executing another plan",
+            DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
+      }
+      NodePlan nodePlan =
+          verifyPlan(planID, planVersion, plan, bandwidth, force);
+      createWorkPlan(nodePlan);
+      this.planID = planID;
+      executePlan();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws if Disk balancer is disabled.
+   *
+   * @throws DiskBalancerException
+   */
+  private void checkDiskBalancerEnabled()
+      throws DiskBalancerException {
+    if (!isDiskBalancerEnabled) {
+      LOG.error("Disk Balancer is not enabled.");
+      throw new DiskBalancerException("Disk Balancer is not enabled.",
+          DiskBalancerException.Result.DISK_BALANCER_NOT_ENABLED);
+    }
+  }
+
+  /**
+   * Verifies that user provided plan is valid.
+   *
+   * @param planID      - SHA 512 of the plan.
+   * @param planVersion - Version of the plan, for future use.
+   * @param plan        - Plan String in Json.
+   * @param bandwidth   - Max disk bandwidth to use per second.
+   * @param force       - Skip verifying when the plan was generated.
+   * @return a NodePlan Object.
+   * @throws DiskBalancerException
+   */
+  private NodePlan verifyPlan(String planID, long planVersion, String plan,
+                              long bandwidth, boolean force)
+      throws DiskBalancerException {
+
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    verifyPlanVersion(planVersion);
+    NodePlan nodePlan = verifyPlanHash(planID, plan);
+    if (!force) {
+      verifyTimeStamp(nodePlan);
+    }
+    verifyNodeUUID(nodePlan);
+    return nodePlan;
+  }
+
+  /**
+   * Verifies the plan version is something that we support.
+   *
+   * @param planVersion - Long version.
+   * @throws DiskBalancerException
+   */
+  private void verifyPlanVersion(long planVersion)
+      throws DiskBalancerException {
+    if ((planVersion < DiskBalancerConstants.DISKBALANCER_MIN_VERSION) ||
+        (planVersion > DiskBalancerConstants.DISKBALANCER_MAX_VERSION)) {
+      LOG.error("Disk Balancer - Invalid plan version.");
+      throw new DiskBalancerException("Invalid plan version.",
+          DiskBalancerException.Result.INVALID_PLAN_VERSION);
+    }
+  }
+
+  /**
+   * Verifies that plan matches the SHA512 provided by the client.
+   *
+   * @param planID - Sha512 Hex Bytes
+   * @param plan   - Plan String
+   * @throws DiskBalancerException
+   */
+  private NodePlan verifyPlanHash(String planID, String plan)
+      throws DiskBalancerException {
+    final long sha512Length = 128;
+    if (plan == null || plan.length() == 0) {
+      LOG.error("Disk Balancer -  Invalid plan.");
+      throw new DiskBalancerException("Invalid plan.",
+          DiskBalancerException.Result.INVALID_PLAN);
+    }
+
+    if ((planID == null) ||
+        (planID.length() != sha512Length) ||
+        !DigestUtils.sha512Hex(plan.getBytes(Charset.forName("UTF-8")))
+            .equalsIgnoreCase(planID)) {
+      LOG.error("Disk Balancer - Invalid plan hash.");
+      throw new DiskBalancerException("Invalid or mis-matched hash.",
+          DiskBalancerException.Result.INVALID_PLAN_HASH);
+    }
+
+    try {
+      return NodePlan.parseJson(plan);
+    } catch (IOException ex) {
+      throw new DiskBalancerException("Parsing plan failed.", ex,
+          DiskBalancerException.Result.MALFORMED_PLAN);
+    }
+  }
+
+  /**
+   * Verifies that this plan is not older than 24 hours.
+   *
+   * @param plan - Node Plan
+   */
+  private void verifyTimeStamp(NodePlan plan) throws DiskBalancerException {
+    long now = Time.now();
+    long planTime = plan.getTimeStamp();
+
+    // TODO : Support Valid Plan hours as a user configurable option.
+    if ((planTime +
+        (TimeUnit.HOURS.toMillis(
+            DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
+      String hourString = "Plan was generated more than " +
+           Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
+          +  " hours ago.";
+      LOG.error("Disk Balancer - " + hourString);
+      throw new DiskBalancerException(hourString,
+          DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
+    }
+  }
+
+  /**
+   * Verify Node UUID.
+   *
+   * @param plan - Node Plan
+   */
+  private void verifyNodeUUID(NodePlan plan) throws DiskBalancerException {
+    if ((plan.getNodeUUID() == null) ||
+        !plan.getNodeUUID().equals(this.dataNodeUUID)) {
+      LOG.error("Disk Balancer - Plan was generated for another node.");
+      throw new DiskBalancerException(
+          "Plan was generated for another node.",
+          DiskBalancerException.Result.DATANODE_ID_MISMATCH);
+    }
+  }
+
+  /**
+   * Convert a node plan to DiskBalancerWorkItem that Datanode can execute.
+   *
+   * @param plan - Node Plan
+   */
+  private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+
+    // Cleanup any residual work in the map.
+    workMap.clear();
+    Map<String, FsVolumeSpi> pathMap = getStorageIDToVolumeMap();
+
+    for (Step step : plan.getVolumeSetPlans()) {
+      String sourceuuid = step.getSourceVolume().getUuid();
+      String destinationuuid = step.getDestinationVolume().getUuid();
+
+      FsVolumeSpi sourceVol = pathMap.get(sourceuuid);
+      if (sourceVol == null) {
+        LOG.error("Disk Balancer - Unable to find source volume. submitPlan " +
+            "failed.");
+        throw new DiskBalancerException("Unable to find source volume.",
+            DiskBalancerException.Result.INVALID_VOLUME);
+      }
+
+      FsVolumeSpi destVol = pathMap.get(destinationuuid);
+      if (destVol == null) {
+        LOG.error("Disk Balancer - Unable to find destination volume. " +
+            "submitPlan failed.");
+        throw new DiskBalancerException("Unable to find destination volume.",
+            DiskBalancerException.Result.INVALID_VOLUME);
+      }
+      createWorkPlan(sourceVol, destVol, step.getBytesToMove());
+    }
+  }
+
+  /**
+   * Returns a path to Volume Map.
+   *
+   * @return Map
+   * @throws DiskBalancerException
+   */
+  private Map<String, FsVolumeSpi> getStorageIDToVolumeMap()
+      throws DiskBalancerException {
+    Map<String, FsVolumeSpi> pathMap = new HashMap<>();
+    FsDatasetSpi.FsVolumeReferences references;
+    try {
+      synchronized (this.dataset) {
+        references = this.dataset.getFsVolumeReferences();
+        for (int ndx = 0; ndx < references.size(); ndx++) {
+          FsVolumeSpi vol = references.get(ndx);
+          pathMap.put(vol.getStorageID(), vol);
+        }
+        references.close();
+      }
+    } catch (IOException ex) {
+      LOG.error("Disk Balancer - Internal Error.", ex);
+      throw new DiskBalancerException("Internal error", ex,
+          DiskBalancerException.Result.INTERNAL_ERROR);
+    }
+    return pathMap;
+  }
+
+  /**
+   * Starts Executing the plan, exits when the plan is done executing.
+   */
+  private void executePlan() {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    this.blockMover.setRunnable();
+    if (this.scheduler.isShutdown()) {
+      this.scheduler = Executors.newSingleThreadExecutor();
+    }
+
+    this.future = scheduler.submit(new Runnable() {
+      @Override
+      public void run() {
+        Thread.currentThread().setName("DiskBalancerThread");
+        LOG.info("Executing Disk balancer plan. Plan ID -  " + planID);
+
+        for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
+            workMap.entrySet()) {
+          blockMover.copyBlocks(entry.getKey(), entry.getValue());
+        }
+      }
+    });
+  }
+
+  /**
+   * Insert work items to work map.
+   *
+   * @param source      - Source vol
+   * @param dest        - destination volume
+   * @param bytesToMove - number of bytes to move
+   */
+  private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
+                              long bytesToMove) throws DiskBalancerException {
+
+    if(source.getStorageID().equals(dest.getStorageID())) {
+      throw new DiskBalancerException("Same source and destination",
+          DiskBalancerException.Result.INVALID_MOVE);
+    }
+    VolumePair pair = new VolumePair(source, dest);
+
+    // In case we have a plan with more than
+    // one line of same <source, dest>
+    // we compress that into one work order.
+    if (workMap.containsKey(pair)) {
+      bytesToMove += workMap.get(pair).getBytesToCopy();
+    }
+
+    DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
+    workMap.put(pair, work);
+  }
+
+  /**
+   * BlockMover supports moving blocks across Volumes.
+   */
+  public interface BlockMover {
+    /**
+     * Copies blocks from a set of volumes.
+     *
+     * @param pair - Source and Destination Volumes.
+     * @param item - Number of bytes to move from volumes.
+     */
+    void copyBlocks(VolumePair pair, DiskBalancerWorkItem item);
+
+    /**
+     * Begin the actual copy operations. This is useful in testing.
+     */
+    void setRunnable();
+
+    /**
+     * Tells copyBlocks to exit from the copy routine.
+     */
+    void setExitFlag();
+
+    /**
+     * Returns a pointer to the current dataset we are operating against.
+     *
+     * @return FsDatasetSpi
+     */
+    FsDatasetSpi getDataset();
+  }
+
+  /**
+   * Holds references to actual volumes that we will be operating against.
+   */
+  static class VolumePair {
+    private final FsVolumeSpi source;
+    private final FsVolumeSpi dest;
+
+    /**
+     * Constructs a volume pair.
+     *
+     * @param source - Source Volume
+     * @param dest   - Destination Volume
+     */
+    public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) {
+      this.source = source;
+      this.dest = dest;
+    }
+
+    /**
+     * gets source volume.
+     *
+     * @return volume
+     */
+    public FsVolumeSpi getSource() {
+      return source;
+    }
+
+    /**
+     * Gets Destination volume.
+     *
+     * @return volume.
+     */
+    public FsVolumeSpi getDest() {
+      return dest;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      VolumePair that = (VolumePair) o;
+      return source.equals(that.source) && dest.equals(that.dest);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = source.getBasePath().hashCode();
+      result = 31 * result + dest.getBasePath().hashCode();
+      return result;
+    }
+  }
+
+  /**
+   * Actual DataMover class for DiskBalancer.
+   * <p/>
+   * TODO : Add implementation for this class. This is here as a place holder so
+   * that Datanode can make calls into this class.
+   */
+  public static class DiskBalancerMover implements BlockMover {
+    private final FsDatasetSpi dataset;
+
+    /**
+     * Constructs diskBalancerMover.
+     *
+     * @param dataset Dataset
+     * @param conf    Configuration
+     */
+    public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
+      this.dataset = dataset;
+      // TODO : Read Config values.
+    }
+
+    /**
+     * Copies blocks from a set of volumes.
+     *
+     * @param pair - Source and Destination Volumes.
+     * @param item - Number of bytes to move from volumes.
+     */
+    @Override
+    public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
+
+    }
+
+    /**
+     * Begin the actual copy operations. This is useful in testing.
+     */
+    @Override
+    public void setRunnable() {
+
+    }
+
+    /**
+     * Tells copyBlocks to exit from the copy routine.
+     */
+    @Override
+    public void setExitFlag() {
+
+    }
+
+    /**
+     * Returns a pointer to the current dataset we are operating against.
+     *
+     * @return FsDatasetSpi
+     */
+    @Override
+    public FsDatasetSpi getDataset() {
+      return dataset;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
index 553827e..7144a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java
@@ -29,6 +29,15 @@ public final class DiskBalancerConstants {
   public static final String DISKBALANCER_VOLUME_NAME =
       "DiskBalancerVolumeName";
 
+  /** Min and Max Plan file versions that we know of. **/
+  public static final int DISKBALANCER_MIN_VERSION = 1;
+  public static final int DISKBALANCER_MAX_VERSION = 1;
+
+  /**
+   * We treat a plan as stale if it was generated before the hours
+   * defined by the constant below. Defaults to 24 hours.
+   */
+  public static final int DISKBALANCER_VALID_PLAN_HOURS = 24;
   // never constructed.
   private DiskBalancerConstants() {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
new file mode 100644
index 0000000..a5e1581
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import java.io.IOException;
+
+/**
+ * Disk Balancer Exceptions.
+ */
+public class DiskBalancerException extends IOException {
+  /** Possible results from DiskBalancer. **/
+  public enum Result {
+    DISK_BALANCER_NOT_ENABLED,
+    INVALID_PLAN_VERSION,
+    INVALID_PLAN,
+    INVALID_PLAN_HASH,
+    OLD_PLAN_SUBMITTED,
+    DATANODE_ID_MISMATCH,
+    MALFORMED_PLAN,
+    PLAN_ALREADY_IN_PROGRESS,
+    INVALID_VOLUME,
+    INVALID_MOVE,
+    INTERNAL_ERROR
+  }
+
+  private final Result result;
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   *                the {@link #getMessage()} method)
+   */
+  public DiskBalancerException(String message, Result result) {
+    super(message);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message and
+   * cause.
+   * <p/>
+   * <p> Note that the detail message associated with {@code cause} is
+   * <i>not</i>
+   * automatically incorporated into this exception's detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   *                the
+   *                {@link #getMessage()} method)
+   * @param cause   The cause (which is saved for later retrieval by the {@link
+   *                #getCause()} method).  (A null value is permitted, and
+   *                indicates that the cause is nonexistent or unknown.)
+   */
+  public DiskBalancerException(String message, Throwable cause, Result result) {
+    super(message, cause);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified cause and a detail
+   * message of {@code (cause==null ? null : cause.toString())} (which typically
+   * contains the class and detail message of {@code cause}). This
+   * constructor is useful for IO exceptions that are little more than
+   * wrappers for other throwables.
+   *
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   *              #getCause()} method).  (A null value is permitted, and
+   *              indicates
+   *              that the cause is nonexistent or unknown.)
+   */
+  public DiskBalancerException(Throwable cause, Result result) {
+    super(cause);
+    this.result = result;
+  }
+
+  /**
+   * Returns the result.
+   * @return int
+   */
+  public Result getResult() {
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
deleted file mode 100644
index 9d47dc3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdfs.server.diskbalancer;
-
-import java.io.IOException;
-
-/**
- * Disk Balancer Exceptions.
- */
-public class DiskbalancerException extends IOException {
-  private int result;
-
-  /**
-   * Constructs an {@code IOException} with the specified detail message.
-   *
-   * @param message The detail message (which is saved for later retrieval by
-   *                the
-   *                {@link #getMessage()} method)
-   */
-  public DiskbalancerException(String message, int result) {
-    super(message);
-    this.result = result;
-  }
-
-  /**
-   * Constructs an {@code IOException} with the specified detail message and
-   * cause.
-   * <p/>
-   * <p> Note that the detail message associated with {@code cause} is
-   * <i>not</i>
-   * automatically incorporated into this exception's detail message.
-   *
-   * @param message The detail message (which is saved for later retrieval by
-   *                the
-   *                {@link #getMessage()} method)
-   * @param cause   The cause (which is saved for later retrieval by the {@link
-   *                #getCause()} method).  (A null value is permitted, and
-   *                indicates that the cause is nonexistent or unknown.)
-   * @since 1.6
-   */
-  public DiskbalancerException(String message, Throwable cause, int result) {
-    super(message, cause);
-    this.result = result;
-  }
-
-  /**
-   * Constructs an {@code IOException} with the specified cause and a detail
-   * message of {@code (cause==null ? null : cause.toString())} (which typically
-   * contains the class and detail message of {@code cause}). This
-   * constructor is
-   * useful for IO exceptions that are little more than wrappers for other
-   * throwables.
-   *
-   * @param cause The cause (which is saved for later retrieval by the {@link
-   *              #getCause()} method).  (A null value is permitted, and
-   *              indicates
-   *              that the cause is nonexistent or unknown.)
-   * @since 1.6
-   */
-  public DiskbalancerException(Throwable cause, int result) {
-    super(cause);
-    this.result = result;
-  }
-
-  /**
-   * Returns the result.
-   * @return int
-   */
-  public int getResult() {
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
index af9e9af..c86fc9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
@@ -358,4 +358,18 @@ public class DiskBalancerCluster {
       return (10 - modValue) + threadRatio;
     }
   }
+
+  /**
+   * Returns a node by UUID.
+   * @param uuid - Node's UUID
+   * @return DiskBalancerDataNode.
+   */
+  public DiskBalancerDataNode getNodeByUUID(String uuid) {
+    for(DiskBalancerDataNode node : this.getNodes()) {
+      if(node.getDataNodeUUID().equals(uuid)) {
+        return node;
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b1b2faf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 143b776..dc24787 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.diskbalancer;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -35,9 +36,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
-import java.net.URI;
-
 public class TestDiskBalancerRPC {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
@@ -48,6 +46,7 @@ public class TestDiskBalancerRPC {
   @Before
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     cluster.waitActive();
   }
@@ -72,22 +71,19 @@ public class TestDiskBalancerRPC {
     Assert.assertEquals(cluster.getDataNodes().size(),
                                     diskBalancerCluster.getNodes().size());
     diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
-    DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(dnIndex);
+
+    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+    DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
+        dataNode.getDatanodeUuid());
     GreedyPlanner planner = new GreedyPlanner(10.0f, node);
     NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
         ());
     planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
-    final int planVersion = 0; // So far we support only one version.
-    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+    final int planVersion = 1; // So far we support only one version.
 
     String planHash = DigestUtils.sha512Hex(plan.toJson());
 
-    // Since submitDiskBalancerPlan is not implemented yet, it throws an
-    // Exception, this will be modified with the actual implementation.
-    thrown.expect(DiskbalancerException.class);
     dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
-
-
   }
 
   @Test
@@ -117,10 +113,10 @@ public class TestDiskBalancerRPC {
     // Exception, this will be modified with the actual implementation.
     try {
       dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
-    } catch (DiskbalancerException ex) {
+    } catch (DiskBalancerException ex) {
       // Let us ignore this for time being.
     }
-    thrown.expect(DiskbalancerException.class);
+    thrown.expect(DiskBalancerException.class);
     dataNode.cancelDiskBalancePlan(planHash);
   }
 
@@ -152,13 +148,13 @@ public class TestDiskBalancerRPC {
     // Exception, this will be modified with the actual implementation.
     try {
       dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
-    } catch (DiskbalancerException ex) {
+    } catch (DiskBalancerException ex) {
       // Let us ignore this for time being.
     }
 
     // TODO : This will be fixed when we have implementation for this
     // function in server side.
-    thrown.expect(DiskbalancerException.class);
+    thrown.expect(DiskBalancerException.class);
     dataNode.queryDiskBalancerPlan();
   }
 
@@ -166,7 +162,7 @@ public class TestDiskBalancerRPC {
   public void testgetDiskBalancerSetting() throws Exception {
     final int dnIndex = 0;
     DataNode dataNode = cluster.getDataNodes().get(dnIndex);
-    thrown.expect(DiskbalancerException.class);
+    thrown.expect(DiskBalancerException.class);
     dataNode.getDiskBalancerSetting(
         DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org