You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 01:35:58 UTC

[02/49] hadoop git commit: HDFS-9720. DiskBalancer : Add configuration parameters. Contributed by Anu Engineer.

HDFS-9720. DiskBalancer : Add configuration parameters. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05067707
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05067707
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05067707

Branch: refs/heads/HDFS-1312
Commit: 050677077beaf42255b3936952b8e816a9201203
Parents: 6c606bf
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Apr 5 12:23:35 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:18:48 2016 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/ClientDatanodeProtocol.java   |  4 +-
 .../ClientDatanodeProtocolTranslatorPB.java     |  8 +-
 .../server/datanode/DiskBalancerWorkItem.java   | 77 +++++++++++++++++++
 .../src/main/proto/ClientDatanodeProtocol.proto |  2 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   | 15 ++++
 ...tDatanodeProtocolServerSideTranslatorPB.java |  6 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  4 +-
 .../hdfs/server/datanode/DiskBalancer.java      | 81 +++++++++++++++-----
 .../server/diskbalancer/planner/MoveStep.java   | 75 ++++++++++++++++++
 .../hdfs/server/diskbalancer/planner/Step.java  | 23 ++++++
 .../diskbalancer/TestDiskBalancerRPC.java       | 31 ++++----
 .../TestDiskBalancerWithMockMover.java          | 37 ++++++++-
 .../hdfs/server/diskbalancer/TestPlanner.java   | 29 ++++---
 13 files changed, 328 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index d8df7fb..3993ce5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -168,8 +168,8 @@ public interface ClientDatanodeProtocol {
   /**
    * Submit a disk balancer plan for execution.
    */
-  void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth,
-                              String plan) throws IOException;
+  void submitDiskBalancerPlan(String planID, long planVersion, String plan,
+                              boolean skipDateCheck) throws IOException;
 
   /**
    * Cancel an executing plan.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 7076026..4f314e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -342,22 +342,20 @@ public class ClientDatanodeProtocolTranslatorPB implements
    *               local copies of these plans.
    * @param planVersion - The data format of the plans - for future , not
    *                    used now.
-   * @param bandwidth - Maximum disk bandwidth to consume, setting this value
-   *                  to zero allows datanode to use the value defined in
-   *                  configration.
    * @param plan - Actual plan.
+   * @param skipDateCheck - Skips the date check.
    * @throws IOException
    */
   @Override
   public void submitDiskBalancerPlan(String planID, long planVersion,
-      long bandwidth, String plan) throws IOException {
+      String plan, boolean skipDateCheck) throws IOException {
     try {
       SubmitDiskBalancerPlanRequestProto request =
           SubmitDiskBalancerPlanRequestProto.newBuilder()
               .setPlanID(planID)
               .setPlanVersion(planVersion)
-              .setMaxDiskBandwidth(bandwidth)
               .setPlan(plan)
+              .setIgnoreDateCheck(skipDateCheck)
               .build();
       rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
     } catch (ServiceException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
index 11730e2..7381499 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import java.io.IOException;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class DiskBalancerWorkItem {
   private final long bytesToCopy;
   private long bytesCopied;
@@ -38,6 +40,10 @@ public class DiskBalancerWorkItem {
   private String errMsg;
   private long blocksCopied;
 
+  private long maxDiskErrors;
+  private long tolerancePercent;
+  private long bandwidth;
+
   /**
    * Constructs a DiskBalancerWorkItem.
    *
@@ -157,4 +163,75 @@ public class DiskBalancerWorkItem {
     return mapper.writeValueAsString(this);
   }
 
+  /**
+   * Sets the Error counts for this step.
+   *
+   * @param errorCount long.
+   */
+  public void setErrorCount(long errorCount) {
+    this.errorCount = errorCount;
+  }
+
+  /**
+   * Number of blocks copied so far.
+   *
+   * @param blocksCopied Blocks copied.
+   */
+  public void setBlocksCopied(long blocksCopied) {
+    this.blocksCopied = blocksCopied;
+  }
+
+  /**
+   * Gets maximum disk errors to tolerate before we fail this copy step.
+   *
+   * @return long.
+   */
+  public long getMaxDiskErrors() {
+    return maxDiskErrors;
+  }
+
+  /**
+   * Sets maximum disk errors to tolerate before we fail this copy step.
+   *
+   * @param maxDiskErrors long
+   */
+  public void setMaxDiskErrors(long maxDiskErrors) {
+    this.maxDiskErrors = maxDiskErrors;
+  }
+
+  /**
+   * Allowed deviation from ideal storage in percentage.
+   *
+   * @return long
+   */
+  public long getTolerancePercent() {
+    return tolerancePercent;
+  }
+
+  /**
+   * Sets the tolerance percentage.
+   *
+   * @param tolerancePercent - tolerance.
+   */
+  public void setTolerancePercent(long tolerancePercent) {
+    this.tolerancePercent = tolerancePercent;
+  }
+
+  /**
+   * Max disk bandwidth to use. MB per second.
+   *
+   * @return - long.
+   */
+  public long getBandwidth() {
+    return bandwidth;
+  }
+
+  /**
+   * Sets max disk bandwidth to use, in MBs per second.
+   *
+   * @param bandwidth - long.
+   */
+  public void setBandwidth(long bandwidth) {
+    this.bandwidth = bandwidth;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
index c7cd4fb..e0a7fd5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
@@ -157,7 +157,7 @@ message SubmitDiskBalancerPlanRequestProto {
   required string planID = 1; // A hash of the plan like SHA512
   required string plan = 2; // Json String that describes the plan
   optional uint64 planVersion = 3; // Plan version number
-  optional uint64 maxDiskBandwidth = 4; // optional bandwidth control.
+  optional bool ignoreDateCheck = 4; // Ignore date checks on this plan.
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 224ab3d..6640ec6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -935,6 +935,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.disk.balancer.enabled";
   public static final boolean DFS_DISK_BALANCER_ENABLED_DEFAULT = false;
 
+  public static final String DFS_DISK_BALANCER_MAX_DISK_THRUPUT =
+      "dfs.disk.balancer.max.disk.throughputInMBperSec";
+  public static final int DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT =
+      10;
+
+  public static final String DFS_DISK_BALANCER_MAX_DISK_ERRORS =
+      "dfs.disk.balancer.max.disk.errors";
+  public static final int DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT = 5;
+
+
+  public static final String DFS_DISK_BALANCER_BLOCK_TOLERANCE =
+      "dfs.disk.balancer.block.tolerance.percent";
+  public static final int DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT = 5;
+
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index d72a060..482e86f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -254,9 +254,9 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       throws ServiceException {
     try {
       impl.submitDiskBalancerPlan(request.getPlanID(),
-          request.hasPlanVersion() ? request.getPlanVersion() : 0,
-          request.hasMaxDiskBandwidth() ? request.getMaxDiskBandwidth() : 0,
-          request.getPlan());
+          request.hasPlanVersion() ? request.getPlanVersion() : 1,
+          request.getPlan(),
+          request.hasIgnoreDateCheck() ? request.getIgnoreDateCheck() : false);
       SubmitDiskBalancerPlanResponseProto response =
           SubmitDiskBalancerPlanResponseProto.newBuilder()
               .build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 8a61291..776da3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3331,11 +3331,11 @@ public class DataNode extends ReconfigurableBase
    */
   @Override
   public void submitDiskBalancerPlan(String planID,
-      long planVersion, long bandwidth, String plan) throws IOException {
+      long planVersion, String plan, boolean skipDateCheck) throws IOException {
 
     checkSuperuserPrivilege();
     // TODO : Support force option
-    this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
+    this.diskBalancer.submitPlan(planID, planVersion, plan, skipDateCheck);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index 972f0fc..b62a4fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -35,6 +33,8 @@ import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
 import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -99,6 +100,9 @@ public class DiskBalancer {
     this.isDiskBalancerEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
         DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
+    this.bandwidth = conf.getInt(
+        DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
+        DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT);
   }
 
   /**
@@ -144,13 +148,11 @@ public class DiskBalancer {
    * @param planID      - A SHA512 of the plan string
    * @param planVersion - version of the plan string - for future use.
    * @param plan        - Actual Plan
-   * @param bandwidth   - BytesPerSec to copy
    * @param force       - Skip some validations and execute the plan file.
    * @throws DiskBalancerException
    */
   public void submitPlan(String planID, long planVersion, String plan,
-                         long bandwidth, boolean force)
-      throws DiskBalancerException {
+                         boolean force) throws DiskBalancerException {
 
     lock.lock();
     try {
@@ -160,12 +162,10 @@ public class DiskBalancer {
         throw new DiskBalancerException("Executing another plan",
             DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
       }
-      NodePlan nodePlan =
-          verifyPlan(planID, planVersion, plan, bandwidth, force);
+      NodePlan nodePlan = verifyPlan(planID, planVersion, plan, force);
       createWorkPlan(nodePlan);
       this.planID = planID;
       this.currentResult = Result.PLAN_UNDER_PROGRESS;
-      this.bandwidth = bandwidth;
       executePlan();
     } finally {
       lock.unlock();
@@ -292,14 +292,12 @@ public class DiskBalancer {
    * @param planID      - SHA 512 of the plan.
    * @param planVersion - Version of the plan, for future use.
    * @param plan        - Plan String in Json.
-   * @param bandwidth   - Max disk bandwidth to use per second.
    * @param force       - Skip verifying when the plan was generated.
    * @return a NodePlan Object.
    * @throws DiskBalancerException
    */
   private NodePlan verifyPlan(String planID, long planVersion, String plan,
-                              long bandwidth, boolean force)
-      throws DiskBalancerException {
+                               boolean force) throws DiskBalancerException {
 
     Preconditions.checkState(lock.isHeldByCurrentThread());
     verifyPlanVersion(planVersion);
@@ -428,7 +426,7 @@ public class DiskBalancer {
         throw new DiskBalancerException("Unable to find destination volume.",
             DiskBalancerException.Result.INVALID_VOLUME);
       }
-      createWorkPlan(sourceVol, destVol, step.getBytesToMove());
+      createWorkPlan(sourceVol, destVol, step);
     }
   }
 
@@ -488,17 +486,18 @@ public class DiskBalancer {
    *
    * @param source      - Source vol
    * @param dest        - destination volume
-   * @param bytesToMove - number of bytes to move
+   * @param step        - Move Step
    */
   private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
-                              long bytesToMove) throws DiskBalancerException {
+                              Step step) throws DiskBalancerException {
 
     if(source.getStorageID().equals(dest.getStorageID())) {
-      throw new DiskBalancerException("Same source and destination",
-          DiskBalancerException.Result.INVALID_MOVE);
+      LOG.info("Disk Balancer - source & destination volumes are same.");
+      throw new DiskBalancerException("source and destination volumes are " +
+          "same.", DiskBalancerException.Result.INVALID_MOVE);
     }
     VolumePair pair = new VolumePair(source, dest);
-
+    long bytesToMove = step.getBytesToMove();
     // In case we have a plan with more than
     // one line of same <source, dest>
     // we compress that into one work order.
@@ -507,6 +506,12 @@ public class DiskBalancer {
     }
 
     DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
+
+    // all these values can be zero, if so we will use
+    // values from configuration.
+    work.setBandwidth(step.getBandwidth());
+    work.setTolerancePercent(step.getTolerancePercent());
+    work.setMaxDiskErrors(step.getMaxDiskErrors());
     workMap.put(pair, work);
   }
 
@@ -600,11 +605,12 @@ public class DiskBalancer {
   /**
    * Actual DataMover class for DiskBalancer.
    * <p/>
-   * TODO : Add implementation for this class. This is here as a place holder so
-   * that Datanode can make calls into this class.
    */
   public static class DiskBalancerMover implements BlockMover {
     private final FsDatasetSpi dataset;
+    private long diskBandwidth;
+    private long blockTolerance;
+    private long maxDiskErrors;
 
     /**
      * Constructs diskBalancerMover.
@@ -614,7 +620,42 @@ public class DiskBalancer {
      */
     public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
       this.dataset = dataset;
-      // TODO : Read Config values.
+
+      this.diskBandwidth = conf.getLong(
+          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
+          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT);
+
+      this.blockTolerance = conf.getLong(
+          DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE,
+          DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT);
+
+      this.maxDiskErrors = conf.getLong(
+          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS,
+          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);
+
+      // Since these are user provided values make sure it is sane
+      // or ignore faulty values.
+      if (this.diskBandwidth <= 0) {
+        LOG.debug("Found 0 or less as max disk throughput, ignoring config " +
+            "value. value : " + diskBandwidth);
+        diskBandwidth =
+            DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT;
+      }
+
+      if (this.blockTolerance <= 0) {
+        LOG.debug("Found 0 or less for block tolerance value, ignoring config" +
+            "value. value : " + blockTolerance);
+        blockTolerance =
+            DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT;
+
+      }
+
+      if (this.maxDiskErrors < 0) {
+        LOG.debug("Found  less than 0 for maxDiskErrors value, ignoring " +
+            "config value. value : " + maxDiskErrors);
+        maxDiskErrors =
+            DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT;
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
index 75af0d6..9a493a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
@@ -19,8 +19,19 @@ package org.apache.hadoop.hdfs.server.diskbalancer.planner;
 
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
+
+
+
+
 
 /**
+ * Ignore fields with default values. In most cases Throughtput, diskErrors
+ * tolerancePercent and bandwidth will be the system defaults.
+ * So we will avoid serializing them into JSON.
+ */
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+/**
  * Move step is a step that planner can execute that will move data from one
  * volume to another.
  */
@@ -31,6 +42,10 @@ public class MoveStep implements Step {
   private long bytesToMove;
   private String volumeSetID;
 
+  private long maxDiskErrors;
+  private long tolerancePercent;
+  private long bandwidth;
+
   /**
    * Constructs a MoveStep for the volume set.
    *
@@ -178,4 +193,64 @@ public class MoveStep implements Step {
   public String getSizeString(long size) {
     return StringUtils.TraditionalBinaryPrefix.long2String(size, "", 1);
   }
+
+  /**
+   * Gets Maximum numbers of errors to be tolerated before this
+   * move operation is aborted.
+   * @return  long.
+   */
+  public long getMaxDiskErrors() {
+    return maxDiskErrors;
+  }
+
+  /**
+   * Sets the maximum numbers of Errors to be tolerated before this
+   * step is aborted.
+   * @param maxDiskErrors - long
+   */
+  public void setMaxDiskErrors(long maxDiskErrors) {
+    this.maxDiskErrors = maxDiskErrors;
+  }
+
+  /**
+   * Tolerance Percentage indicates when a move operation is considered good
+   * enough. This is a percentage of deviation from ideal that is considered
+   * fine.
+   *
+   * For example : if the ideal amount on each disk was 1 TB and the
+   * tolerance was 10%, then getting to 900 GB on the destination disk is
+   * considerd good enough.
+   *
+   * @return tolerance percentage.
+   */
+  public long getTolerancePercent() {
+    return tolerancePercent;
+  }
+
+  /**
+   * Sets the tolerance percentage.
+   * @param tolerancePercent  - long
+   */
+  public void setTolerancePercent(long tolerancePercent) {
+    this.tolerancePercent = tolerancePercent;
+  }
+
+  /**
+   * Gets the disk Bandwidth. That is the MB/Sec to copied. We will max out
+   * on this amount of throughput. This is useful to prevent too much I/O on
+   * datanode while data node is in use.
+   * @return  long.
+   */
+  public long getBandwidth() {
+    return bandwidth;
+  }
+
+  /**
+   * Sets the maximum disk bandwidth per sec to use for this step.
+   * @param bandwidth  - Long, MB / Sec of data to be moved between
+   *                   source and destinatin volume.
+   */
+  public void setBandwidth(long bandwidth) {
+    this.bandwidth = bandwidth;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
index d87209e..f13909f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
@@ -65,4 +65,27 @@ public interface Step {
    */
   String getSizeString(long size);
 
+  /**
+   * Returns maximum number of disk erros tolerated.
+   * @return long.
+   */
+  long getMaxDiskErrors();
+
+  /**
+   * Returns tolerance percentage, the good enough value
+   * when we move data from one to disk to another.
+   * @return long.
+   */
+  long getTolerancePercent();
+
+  /**
+   * Returns max disk bandwidth that disk balancer will use.
+   * Expressed in MB/sec. For example, a value like 10
+   * indicates that disk balancer will only move 10 MB / sec
+   * while it is running.
+   * @return long.
+   */
+  long getBandwidth();
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index a65ed21..27cd8eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -30,7 +30,9 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
 import org.hamcrest.*;
 import org.junit.After;
 import org.junit.Assert;
@@ -76,7 +78,8 @@ public class TestDiskBalancerRPC {
     String planHash = rpcTestHelper.getPlanHash();
     int planVersion = rpcTestHelper.getPlanVersion();
     NodePlan plan = rpcTestHelper.getPlan();
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
+        false);
   }
 
   @Test
@@ -91,7 +94,8 @@ public class TestDiskBalancerRPC {
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
     thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH));
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
+        false);
   }
 
   @Test
@@ -104,7 +108,8 @@ public class TestDiskBalancerRPC {
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
     thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION));
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
+        false);
   }
 
   @Test
@@ -116,8 +121,8 @@ public class TestDiskBalancerRPC {
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
     thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN));
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
-  }
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, "",
+        false);  }
 
   @Test
   public void testCancelPlan() throws Exception {
@@ -126,7 +131,8 @@ public class TestDiskBalancerRPC {
     String planHash = rpcTestHelper.getPlanHash();
     int planVersion = rpcTestHelper.getPlanVersion();
     NodePlan plan = rpcTestHelper.getPlan();
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
+        false);
     dataNode.cancelDiskBalancePlan(planHash);
   }
 
@@ -189,15 +195,14 @@ public class TestDiskBalancerRPC {
     int planVersion = rpcTestHelper.getPlanVersion();
     NodePlan plan = rpcTestHelper.getPlan();
 
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
+        false);
     String bandwidthString = dataNode.getDiskBalancerSetting(
         DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
     long value = Long.decode(bandwidthString);
     Assert.assertEquals(10L, value);
   }
 
-
-
   @Test
   public void testQueryPlan() throws Exception {
     RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
@@ -206,7 +211,8 @@ public class TestDiskBalancerRPC {
     int planVersion = rpcTestHelper.getPlanVersion();
     NodePlan plan = rpcTestHelper.getPlan();
 
-    dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
+    dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
+        false);
     DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
     Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
         status.getResult() == PLAN_DONE);
@@ -221,7 +227,6 @@ public class TestDiskBalancerRPC {
     Assert.assertTrue(status.getResult() == NO_PLAN);
   }
 
-
   private class RpcTestHelper {
     private NodePlan plan;
     private int planVersion;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
index ed761ed..5032611 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
@@ -50,11 +50,14 @@ import org.junit.rules.ExpectedException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class TestDiskBalancerWithMockMover {
@@ -120,7 +123,7 @@ public class TestDiskBalancerWithMockMover {
                                  int version) throws IOException {
     String planJson = plan.toJson();
     String planID = DigestUtils.sha512Hex(planJson);
-    balancer.submitPlan(planID, version, planJson, 10, false);
+    balancer.submitPlan(planID, version, planJson, false);
   }
 
   private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
@@ -209,7 +212,7 @@ public class TestDiskBalancerWithMockMover {
     thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
         .Result.INVALID_PLAN));
 
-    balancer.submitPlan(planID, 1, null, 10, false);
+    balancer.submitPlan(planID, 1, null, false);
   }
 
   @Test
@@ -228,7 +231,7 @@ public class TestDiskBalancerWithMockMover {
     thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
         .Result.INVALID_PLAN_HASH));
     balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
-        1, planJson, 10, false);
+        1, planJson, false);
 
   }
 
@@ -278,6 +281,34 @@ public class TestDiskBalancerWithMockMover {
 
   }
 
+
+  /**
+   * Test Custom bandwidth.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCustomBandwidth() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+    for(Step step : plan.getVolumeSetPlans()){
+      MoveStep tempStep = (MoveStep) step;
+      tempStep.setBandwidth(100);
+    }
+    executeSubmitPlan(plan, balancer);
+    DiskBalancerWorkStatus status = balancer
+        .queryWorkStatus();
+    assertNotNull(status);
+
+    DiskBalancerWorkStatus.DiskBalancerWorkEntry entry =
+        balancer.queryWorkStatus().getCurrentState().get(0);
+    assertEquals(100L, entry.getWorkItem().getBandwidth());
+
+  }
+
+
   @Before
   public void setUp() throws Exception {
     Configuration conf = new HdfsConfiguration();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
index f756104..ad18075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
     .DiskBalancerVolumeSet;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
-import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
 import org.junit.Assert;
@@ -48,7 +47,7 @@ public class TestPlanner {
       LoggerFactory.getLogger(TestPlanner.class);
 
   @Test
-  public void TestGreedyPlannerBalanceVolumeSet() throws Exception {
+  public void testGreedyPlannerBalanceVolumeSet() throws Exception {
     URI clusterJson = getClass()
         .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI();
     ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
@@ -65,7 +64,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerComputePlan() throws Exception {
+  public void testGreedyPlannerComputePlan() throws Exception {
     URI clusterJson = getClass()
         .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI();
     ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
@@ -90,13 +89,13 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerNoNodeCluster() throws Exception {
+  public void testGreedyPlannerNoNodeCluster() throws Exception {
     GreedyPlanner planner = new GreedyPlanner(10.0f, null);
     assertNotNull(planner);
   }
 
   @Test
-  public void TestGreedyPlannerNoVolumeTest() throws Exception {
+  public void testGreedyPlannerNoVolumeTest() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
     List<NodePlan> planList = cluster.computePlan(10.0f);
@@ -104,7 +103,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerOneVolumeNoPlanTest() throws Exception {
+  public void testGreedyPlannerOneVolumeNoPlanTest() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -127,7 +126,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerTwoVolume() throws Exception {
+  public void testGreedyPlannerTwoVolume() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -166,7 +165,7 @@ public class TestPlanner {
    * That is the plan should say move 10 GB from volume30 to volume10.
    */
   @Test
-  public void TestGreedyPlannerEqualizeData() throws Exception {
+  public void testGreedyPlannerEqualizeData() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -201,7 +200,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerEqualDisksNoMoves() throws Exception {
+  public void testGreedyPlannerEqualDisksNoMoves() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -232,7 +231,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerMoveFromSingleDisk() throws Exception {
+  public void testGreedyPlannerMoveFromSingleDisk() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -271,7 +270,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerThresholdTest() throws Exception {
+  public void testGreedyPlannerThresholdTest() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -327,7 +326,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestGreedyPlannerPlanWithDifferentDiskSizes() throws Exception {
+  public void testGreedyPlannerPlanWithDifferentDiskSizes() throws Exception {
     NullConnector nullConnector = new NullConnector();
     DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
 
@@ -381,7 +380,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestLoadsCorrectClusterConnector() throws Exception {
+  public void testLoadsCorrectClusterConnector() throws Exception {
     ClusterConnector connector = ConnectorFactory.getCluster(getClass()
             .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI()
         , null);
@@ -392,7 +391,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestPlannerScale() throws Exception {
+  public void testPlannerScale() throws Exception {
     final int diskCount = 256; // it is rare to see more than 48 disks
     DiskBalancerTestUtil util = new DiskBalancerTestUtil();
     DiskBalancerVolumeSet vSet =
@@ -428,7 +427,7 @@ public class TestPlanner {
   }
 
   @Test
-  public void TestNodePlanSerialize() throws Exception {
+  public void testNodePlanSerialize() throws Exception {
     final int diskCount = 12;
     DiskBalancerTestUtil util = new DiskBalancerTestUtil();
     DiskBalancerVolumeSet vSet =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org