You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 06:05:53 UTC

[22/49] hadoop git commit: HDFS-9543. DiskBalancer: Add Data mover. Contributed by Anu Engineer.

HDFS-9543. DiskBalancer: Add Data mover. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1594b472
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1594b472
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1594b472

Branch: refs/heads/trunk
Commit: 1594b472bb9df7537dbc001411c99058cc11ba41
Parents: 7820737
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Apr 28 16:12:04 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:20:24 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/datanode/DataNode.java   |   2 -
 .../hdfs/server/datanode/DiskBalancer.java      | 365 +++++++++++++++++--
 .../datamodel/DiskBalancerDataNode.java         |  13 +-
 .../datamodel/DiskBalancerVolume.java           |   6 +-
 .../datamodel/DiskBalancerVolumeSet.java        |  34 +-
 .../server/diskbalancer/planner/MoveStep.java   |  14 +-
 .../hdfs/server/diskbalancer/planner/Step.java  |  20 +-
 .../hdfs/server/balancer/TestBalancer.java      |   3 +-
 .../server/diskbalancer/TestDiskBalancer.java   | 247 +++++++++++++
 .../hdfs/server/diskbalancer/TestPlanner.java   |  28 +-
 10 files changed, 666 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 776da3a..d6be2e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3324,8 +3324,6 @@ public class DataNode extends ReconfigurableBase
    * @param planID  - Hash value of the plan.
    * @param planVersion - Plan version, reserved for future use. We have only
    *                    version 1 now.
-   * @param bandwidth - Max disk bandwidth to use, 0 means use value defined
-   *                  in the configration.
    * @param plan - Actual plan
    * @throws IOException
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index b62a4fc..7f768ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -23,7 +23,9 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.DiskBalancerWorkEntry;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
+    .DiskBalancerWorkEntry;
 import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -48,18 +52,21 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 /**
  * Worker class for Disk Balancer.
- * <p/>
+ * <p>
  * Here is the high level logic executed by this class. Users can submit disk
  * balancing plans using submitPlan calls. After a set of sanity checks the plan
  * is admitted and put into workMap.
- * <p/>
+ * <p>
  * The executePlan launches a thread that picks up work from workMap and hands
  * it over to the BlockMover#copyBlocks function.
- * <p/>
+ * <p>
  * Constraints :
- * <p/>
+ * <p>
  * Only one plan can be executing in a datanode at any given time. This is
  * ensured by checking the future handle of the worker thread in submitPlan.
  */
@@ -127,11 +134,12 @@ public class DiskBalancer {
    * Shutdown the executor.
    */
   private void shutdownExecutor() {
+    final int secondsTowait = 10;
     scheduler.shutdown();
     try {
-      if(!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+      if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
         scheduler.shutdownNow();
-        if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+        if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
           LOG.error("Disk Balancer : Scheduler did not terminate.");
         }
       }
@@ -207,6 +215,7 @@ public class DiskBalancer {
 
   /**
    * Cancels a running plan.
+   *
    * @param planID - Hash of the plan to cancel.
    * @throws DiskBalancerException
    */
@@ -297,7 +306,7 @@ public class DiskBalancer {
    * @throws DiskBalancerException
    */
   private NodePlan verifyPlan(String planID, long planVersion, String plan,
-                               boolean force) throws DiskBalancerException {
+                              boolean force) throws DiskBalancerException {
 
     Preconditions.checkState(lock.isHeldByCurrentThread());
     verifyPlanVersion(planVersion);
@@ -372,8 +381,8 @@ public class DiskBalancer {
         (TimeUnit.HOURS.toMillis(
             DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
       String hourString = "Plan was generated more than " +
-           Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
-          +  " hours ago.";
+          Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
+          + " hours ago.";
       LOG.error("Disk Balancer - " + hourString);
       throw new DiskBalancerException(hourString,
           DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
@@ -484,14 +493,14 @@ public class DiskBalancer {
   /**
    * Insert work items to work map.
    *
-   * @param source      - Source vol
-   * @param dest        - destination volume
-   * @param step        - Move Step
+   * @param source - Source vol
+   * @param dest   - destination volume
+   * @param step   - Move Step
    */
   private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
                               Step step) throws DiskBalancerException {
 
-    if(source.getStorageID().equals(dest.getStorageID())) {
+    if (source.getStorageID().equals(dest.getStorageID())) {
       LOG.info("Disk Balancer - source & destination volumes are same.");
       throw new DiskBalancerException("source and destination volumes are " +
           "same.", DiskBalancerException.Result.INVALID_MOVE);
@@ -604,13 +613,15 @@ public class DiskBalancer {
 
   /**
    * Actual DataMover class for DiskBalancer.
-   * <p/>
+   * <p>
    */
   public static class DiskBalancerMover implements BlockMover {
     private final FsDatasetSpi dataset;
     private long diskBandwidth;
     private long blockTolerance;
     private long maxDiskErrors;
+    private int poolIndex;
+    private AtomicBoolean shouldRun;
 
     /**
      * Constructs diskBalancerMover.
@@ -620,6 +631,7 @@ public class DiskBalancer {
      */
     public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
       this.dataset = dataset;
+      shouldRun = new AtomicBoolean(false);
 
       this.diskBandwidth = conf.getLong(
           DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
@@ -659,30 +671,333 @@ public class DiskBalancer {
     }
 
     /**
-     * Copies blocks from a set of volumes.
-     *
-     * @param pair - Source and Destination Volumes.
-     * @param item - Number of bytes to move from volumes.
+     * Sets Diskmover copyblocks into runnable state.
      */
     @Override
-    public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
-
+    public void setRunnable() {
+      this.shouldRun.set(true);
     }
 
     /**
-     * Begin the actual copy operations. This is useful in testing.
+     * Signals copy block to exit.
      */
     @Override
-    public void setRunnable() {
+    public void setExitFlag() {
+      this.shouldRun.set(false);
+    }
 
+    /**
+     * Returns the shouldRun boolean flag.
+     */
+    public boolean shouldRun() {
+      return this.shouldRun.get();
     }
 
     /**
-     * Tells copyBlocks to exit from the copy routine.
+     * Checks if a given block is less than needed size to meet our goal.
+     *
+     * @param blockSize - block len
+     * @param item      - Work item
+     * @return true if this block meets our criteria, false otherwise.
+     */
+    private boolean isLessThanNeeded(long blockSize,
+                                     DiskBalancerWorkItem item) {
+      long bytesToCopy = item.getBytesToCopy() - item.getBytesCopied();
+      bytesToCopy = bytesToCopy +
+          ((bytesToCopy * getBlockTolerancePercentage(item)) / 100);
+      return (blockSize <= bytesToCopy) ? true : false;
+    }
+
+    /**
+     * Returns the default block tolerance if the plan does not have value of
+     * tolerance specified.
+     *
+     * @param item - DiskBalancerWorkItem
+     * @return long
+     */
+    private long getBlockTolerancePercentage(DiskBalancerWorkItem item) {
+      return item.getTolerancePercent() <= 0 ? this.blockTolerance :
+          item.getTolerancePercent();
+    }
+
+    /**
+     * Inflates bytesCopied and returns true or false. This allows us to stop
+     * copying if we have reached close enough.
+     *
+     * @param item DiskBalancerWorkItem
+     * @return -- false if we need to copy more, true if we are done
+     */
+    private boolean isCloseEnough(DiskBalancerWorkItem item) {
+      long temp = item.getBytesCopied() +
+          ((item.getBytesCopied() * getBlockTolerancePercentage(item)) / 100);
+      return (item.getBytesToCopy() >= temp) ? false : true;
+    }
+
+    /**
+     * Returns disk bandwidth associated with this plan, if none is specified
+     * returns the global default.
+     *
+     * @param item DiskBalancerWorkItem.
+     * @return MB/s - long
+     */
+    private long getDiskBandwidth(DiskBalancerWorkItem item) {
+      return item.getBandwidth() <= 0 ? this.diskBandwidth : item
+          .getBandwidth();
+    }
+
+    /**
+     * Computes sleep delay needed based on the block that just got copied. we
+     * copy using a burst mode, that is we let the copy proceed in full
+     * throttle. Once a copy is done, we compute how many bytes have been
+     * transferred and try to average it over the user specified bandwidth. In
+     * other words, This code implements a poor man's token bucket algorithm for
+     * traffic shaping.
+     *
+     * @param bytesCopied - byteCopied.
+     * @param timeUsed    in milliseconds
+     * @param item        DiskBalancerWorkItem
+     * @return sleep delay in Milliseconds.
+     */
+    private long computeDelay(long bytesCopied, long timeUsed,
+                              DiskBalancerWorkItem item) {
+
+      // we had an overflow, ignore this reading and continue.
+      if (timeUsed == 0) {
+        return 0;
+      }
+      final int megaByte = 1024 * 1024;
+      long bytesInMB = bytesCopied / megaByte;
+      long lastThroughput = bytesInMB / SECONDS.convert(timeUsed,
+          TimeUnit.MILLISECONDS);
+      long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput;
+      return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns maximum errors to tolerate for the specific plan or the default.
+     *
+     * @param item - DiskBalancerWorkItem
+     * @return maximum error counts to tolerate.
+     */
+    private long getMaxError(DiskBalancerWorkItem item) {
+      return item.getMaxDiskErrors() <= 0 ? this.maxDiskErrors :
+          item.getMaxDiskErrors();
+    }
+
+    /**
+     * Gets the next block that we can copy, returns null if we cannot find a
+     * block that fits our parameters or if have run out of blocks.
+     *
+     * @param iter Block Iter
+     * @param item - Work item
+     * @return Extended block or null if no copyable block is found.
+     */
+    private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter,
+                                         DiskBalancerWorkItem item) {
+      while (!iter.atEnd() && item.getErrorCount() < getMaxError(item)) {
+        try {
+          ExtendedBlock block = iter.nextBlock();
+
+          // A valid block is a finalized block, we iterate until we get
+          // finalized blocks
+          if (!this.dataset.isValidBlock(block)) {
+            continue;
+          }
+
+          // We don't look for the best, we just do first fit
+          if (isLessThanNeeded(block.getNumBytes(), item)) {
+            return block;
+          }
+
+        } catch (IOException e) {
+          item.incErrorCount();
+        }
+      }
+
+      if (item.getErrorCount() >= getMaxError(item)) {
+        item.setErrMsg("Error count exceeded.");
+        LOG.info("Maximum error count exceeded. Error count: {} Max error:{} "
+            , item.getErrorCount(), item.getMaxDiskErrors());
+      }
+
+      return null;
+    }
+
+    /**
+     * Opens all Block pools on a given volume.
+     *
+     * @param source    Source
+     * @param poolIters List of PoolIters to maintain.
+     */
+    private void openPoolIters(FsVolumeSpi source, List<FsVolumeSpi
+        .BlockIterator> poolIters) {
+      Preconditions.checkNotNull(source);
+      Preconditions.checkNotNull(poolIters);
+
+      for (String blockPoolID : source.getBlockPoolList()) {
+        poolIters.add(source.newBlockIterator(blockPoolID,
+            "DiskBalancerSource"));
+      }
+    }
+
+    /**
+     * Returns the next block that we copy from all the block pools. This
+     * function looks across all block pools to find the next block to copy.
+     *
+     * @param poolIters - List of BlockIterators
+     * @return ExtendedBlock.
+     */
+    ExtendedBlock getNextBlock(List<FsVolumeSpi.BlockIterator> poolIters,
+                               DiskBalancerWorkItem item) {
+      Preconditions.checkNotNull(poolIters);
+      int currentCount = 0;
+      ExtendedBlock block = null;
+      while (block == null && currentCount < poolIters.size()) {
+        currentCount++;
+        poolIndex = poolIndex++ % poolIters.size();
+        FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(poolIndex);
+        block = getBlockToCopy(currentPoolIter, item);
+      }
+
+      if (block == null) {
+        try {
+          item.setErrMsg("No source blocks found to move.");
+          LOG.error("No movable source blocks found. {}", item.toJson());
+        } catch (IOException e) {
+          LOG.error("Unable to get json from Item.");
+        }
+      }
+      return block;
+    }
+
+    /**
+     * Close all Pool Iters.
+     *
+     * @param poolIters List of BlockIters
+     */
+    private void closePoolIters(List<FsVolumeSpi.BlockIterator> poolIters) {
+      Preconditions.checkNotNull(poolIters);
+      for (FsVolumeSpi.BlockIterator iter : poolIters) {
+        try {
+          iter.close();
+        } catch (IOException ex) {
+          LOG.error("Error closing a block pool iter. ex: {}", ex);
+        }
+      }
+    }
+
+    /**
+     * Copies blocks from a set of volumes.
+     *
+     * @param pair - Source and Destination Volumes.
+     * @param item - Number of bytes to move from volumes.
      */
     @Override
-    public void setExitFlag() {
+    public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
+      FsVolumeSpi source = pair.getSource();
+      FsVolumeSpi dest = pair.getDest();
+      List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
 
+      if (source.isTransientStorage() || dest.isTransientStorage()) {
+        return;
+      }
+
+      try {
+        openPoolIters(source, poolIters);
+        if (poolIters.size() == 0) {
+          LOG.error("No block pools found on volume. volume : {}. Exiting.",
+              source.getBasePath());
+          return;
+        }
+
+        while (shouldRun()) {
+          try {
+
+            // Check for the max error count constraint.
+            if (item.getErrorCount() > getMaxError(item)) {
+              LOG.error("Exceeded the max error count. source {}, dest: {} " +
+                      "error count: {}", source.getBasePath(),
+                  dest.getBasePath(), item.getErrorCount());
+              this.setExitFlag();
+              continue;
+            }
+
+            // Check for the block tolerance constraint.
+            if (isCloseEnough(item)) {
+              LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
+                      "blocks.",
+                  source.getBasePath(), dest.getBasePath(),
+                  item.getBytesCopied(), item.getBlocksCopied());
+              this.setExitFlag();
+              continue;
+            }
+
+            ExtendedBlock block = getNextBlock(poolIters, item);
+            // we are not able to find any blocks to copy.
+            if (block == null) {
+              this.setExitFlag();
+              LOG.error("No source blocks, exiting the copy. Source: {}, " +
+                      "dest:{}", source.getBasePath(), dest.getBasePath());
+              continue;
+            }
+
+            // check if someone told us exit, treat this as an interruption
+            // point
+            // for the thread, since both getNextBlock and moveBlocAcrossVolume
+            // can take some time.
+            if (!shouldRun()) {
+              continue;
+            }
+
+            long timeUsed;
+            // There is a race condition here, but we will get an IOException
+            // if dest has no space, which we handle anyway.
+            if (dest.getAvailable() > item.getBytesToCopy()) {
+              long begin = System.nanoTime();
+              this.dataset.moveBlockAcrossVolumes(block, dest);
+              long now = System.nanoTime();
+              timeUsed = (now - begin) > 0 ? now - begin : 0;
+            } else {
+
+              // Technically it is possible for us to find a smaller block and
+              // make another copy, but opting for the safer choice of just
+              // exiting here.
+              LOG.error("Destination volume: {} does not have enough space to" +
+                  " accommodate a block. Block Size: {} Exiting from" +
+                  " copyBlocks.", dest.getBasePath(), block.getNumBytes());
+              this.setExitFlag();
+              continue;
+            }
+
+            LOG.debug("Moved block with size {} from  {} to {}",
+                block.getNumBytes(), source.getBasePath(),
+                dest.getBasePath());
+
+            item.incCopiedSoFar(block.getNumBytes());
+            item.incBlocksCopied();
+
+            // Check for the max throughput constraint.
+            // We sleep here to keep the promise that we will not
+            // copy more than Max MB/sec. we sleep enough time
+            // to make sure that our promise is good on average.
+            // Because we sleep, if a shutdown or cancel call comes in
+            // we exit via Thread Interrupted exception.
+            Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
+
+          } catch (IOException ex) {
+            LOG.error("Exception while trying to copy blocks. error: {}", ex);
+            item.incErrorCount();
+          } catch (InterruptedException e) {
+            LOG.error("Copy Block Thread interrupted, exiting the copy.");
+            Thread.currentThread().interrupt();
+            item.incErrorCount();
+            this.setExitFlag();
+          }
+        }
+      } finally {
+        // Close all Iters.
+        closePoolIters(poolIters);
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
index 87030db..f70a983 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
@@ -28,7 +28,7 @@ import java.util.Map;
  * between a set of Nodes.
  */
 public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
-  private float nodeDataDensity;
+  private double nodeDataDensity;
   private Map<String, DiskBalancerVolumeSet> volumeSets;
   private String dataNodeUUID;
   private String dataNodeIP;
@@ -159,17 +159,17 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
   public int compareTo(DiskBalancerDataNode that) {
     Preconditions.checkNotNull(that);
 
-    if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+    if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
         < 0) {
       return -1;
     }
 
-    if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+    if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
         == 0) {
       return 0;
     }
 
-    if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+    if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
         > 0) {
       return 1;
     }
@@ -190,7 +190,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
    *
    * @return float
    */
-  public float getNodeDataDensity() {
+  public double getNodeDataDensity() {
     return nodeDataDensity;
   }
 
@@ -201,7 +201,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
    * spread across a set of volumes inside the node.
    */
   public void computeNodeDensity() {
-    float sum = 0;
+    double sum = 0;
     int volcount = 0;
     for (DiskBalancerVolumeSet vset : volumeSets.values()) {
       for (DiskBalancerVolume vol : vset.getVolumes()) {
@@ -249,6 +249,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
       vSet = volumeSets.get(volumeSetKey);
     } else {
       vSet = new DiskBalancerVolumeSet(volume.isTransient());
+      vSet.setStorageType(volumeSetKey);
       volumeSets.put(volumeSetKey, vSet);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
index 24e891f..2a39609 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
@@ -38,7 +38,7 @@ public class DiskBalancerVolume {
   private String uuid;
   private boolean failed;
   private boolean isTransient;
-  private float volumeDataDensity;
+  private double volumeDataDensity;
   private boolean skip = false;
   private boolean isReadOnly;
 
@@ -69,7 +69,7 @@ public class DiskBalancerVolume {
    *
    * @return float.
    */
-  public float getVolumeDataDensity() {
+  public double getVolumeDataDensity() {
     return volumeDataDensity;
   }
 
@@ -78,7 +78,7 @@ public class DiskBalancerVolume {
    *
    * @param volDataDensity - density
    */
-  public void setVolumeDataDensity(float volDataDensity) {
+  public void setVolumeDataDensity(double volDataDensity) {
     this.volumeDataDensity = volDataDensity;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
index 2faf249..70d7536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
@@ -53,7 +53,7 @@ public class DiskBalancerVolumeSet {
   private String storageType;
   private String setID;
 
-  private float idealUsed;
+  private double idealUsed;
 
 
   /**
@@ -142,19 +142,32 @@ public class DiskBalancerVolumeSet {
     }
 
     if (totalCapacity != 0) {
-      this.idealUsed = totalUsed / (float) totalCapacity;
+      this.idealUsed = truncateDecimals(totalUsed /
+          (double) totalCapacity);
     }
 
     for (DiskBalancerVolume volume : volumes) {
       if (!volume.isFailed() && !volume.isSkip()) {
-        float dfsUsedRatio =
-            volume.getUsed() / (float) volume.computeEffectiveCapacity();
+        double dfsUsedRatio =
+            truncateDecimals(volume.getUsed() /
+                (double) volume.computeEffectiveCapacity());
+
         volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio);
         sortedQueue.add(volume);
       }
     }
   }
 
+  /**
+   * Truncate to 4 digits since uncontrolled precision is some times
+   * counter intitive to what users expect.
+   * @param value - double.
+   * @return double.
+   */
+  private double truncateDecimals(double value) {
+    final int multiplier = 10000;
+    return (double) ((long) (value * multiplier)) / multiplier;
+  }
   private void skipMisConfiguredVolume(DiskBalancerVolume volume) {
     //probably points to some sort of mis-configuration. Log this and skip
     // processing this volume.
@@ -255,7 +268,7 @@ public class DiskBalancerVolumeSet {
    * @return true if balancing is needed false otherwise.
    */
   public boolean isBalancingNeeded(float thresholdPercentage) {
-    float threshold = thresholdPercentage / 100.0f;
+    double threshold = thresholdPercentage / 100.0d;
 
     if(volumes == null || volumes.size() <= 1) {
       // there is nothing we can do with a single volume.
@@ -265,7 +278,10 @@ public class DiskBalancerVolumeSet {
 
     for (DiskBalancerVolume vol : volumes) {
       boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip();
-      if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) {
+      Double absDensity =
+          truncateDecimals(Math.abs(vol.getVolumeDataDensity()));
+
+      if ((absDensity > threshold) && notSkip) {
         return true;
       }
     }
@@ -306,7 +322,7 @@ public class DiskBalancerVolumeSet {
    */
 
   @JsonIgnore
-  public float getIdealUsed() {
+  public double getIdealUsed() {
     return this.idealUsed;
   }
 
@@ -319,8 +335,8 @@ public class DiskBalancerVolumeSet {
      */
     @Override
     public int compare(DiskBalancerVolume first, DiskBalancerVolume second) {
-      return Float
-          .compare(second.getVolumeDataDensity(), first.getVolumeDataDensity());
+      return Double.compare(second.getVolumeDataDensity(),
+          first.getVolumeDataDensity());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
index 9a493a5..b5f68fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
@@ -38,7 +38,7 @@ import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
 public class MoveStep implements Step {
   private DiskBalancerVolume sourceVolume;
   private DiskBalancerVolume destinationVolume;
-  private float idealStorage;
+  private double idealStorage;
   private long bytesToMove;
   private String volumeSetID;
 
@@ -55,7 +55,7 @@ public class MoveStep implements Step {
    * @param bytesToMove       - number of bytes to move
    * @param volumeSetID       - a diskBalancer generated id.
    */
-  public MoveStep(DiskBalancerVolume sourceVolume, float idealStorage,
+  public MoveStep(DiskBalancerVolume sourceVolume, double idealStorage,
                   DiskBalancerVolume destinationVolume, long bytesToMove,
                   String volumeSetID) {
     this.destinationVolume = destinationVolume;
@@ -98,7 +98,7 @@ public class MoveStep implements Step {
    * @return float
    */
   @Override
-  public float getIdealStorage() {
+  public double getIdealStorage() {
     return idealStorage;
   }
 
@@ -146,7 +146,7 @@ public class MoveStep implements Step {
    *
    * @param idealStorage - ideal Storage
    */
-  public void setIdealStorage(float idealStorage) {
+  public void setIdealStorage(double idealStorage) {
     this.idealStorage = idealStorage;
   }
 
@@ -199,6 +199,7 @@ public class MoveStep implements Step {
    * move operation is aborted.
    * @return  long.
    */
+  @Override
   public long getMaxDiskErrors() {
     return maxDiskErrors;
   }
@@ -208,6 +209,7 @@ public class MoveStep implements Step {
    * step is aborted.
    * @param maxDiskErrors - long
    */
+  @Override
   public void setMaxDiskErrors(long maxDiskErrors) {
     this.maxDiskErrors = maxDiskErrors;
   }
@@ -223,6 +225,7 @@ public class MoveStep implements Step {
    *
    * @return tolerance percentage.
    */
+  @Override
   public long getTolerancePercent() {
     return tolerancePercent;
   }
@@ -231,6 +234,7 @@ public class MoveStep implements Step {
    * Sets the tolerance percentage.
    * @param tolerancePercent  - long
    */
+  @Override
   public void setTolerancePercent(long tolerancePercent) {
     this.tolerancePercent = tolerancePercent;
   }
@@ -241,6 +245,7 @@ public class MoveStep implements Step {
    * datanode while data node is in use.
    * @return  long.
    */
+  @Override
   public long getBandwidth() {
     return bandwidth;
   }
@@ -250,6 +255,7 @@ public class MoveStep implements Step {
    * @param bandwidth  - Long, MB / Sec of data to be moved between
    *                   source and destinatin volume.
    */
+  @Override
   public void setBandwidth(long bandwidth) {
     this.bandwidth = bandwidth;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
index f13909f..8f69653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
@@ -42,7 +42,7 @@ public interface Step {
    *
    * @return idealStorage
    */
-  float getIdealStorage();
+  double getIdealStorage();
 
   /**
    * Gets Source Volume.
@@ -87,5 +87,23 @@ public interface Step {
    */
   long getBandwidth();
 
+  /**
+   * Sets Tolerance percent on a specific step.
+   * @param tolerancePercent - tolerance in percentage.
+   */
+  void setTolerancePercent(long tolerancePercent);
+
+  /**
+   * Set Bandwidth on a specific step.
+   * @param bandwidth - in MB/s
+   */
+  void setBandwidth(long bandwidth);
+
+  /**
+   * Set maximum errors to tolerate before disk balancer step fails.
+   * @param maxDiskErrors - error count.
+   */
+  void setMaxDiskErrors(long maxDiskErrors);
+
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 738cfe6..1cfd488 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -264,7 +264,8 @@ public class TestBalancer {
   }
 
   /* create a file with a length of <code>fileLen</code> */
-  static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
+  public static void createFile(MiniDFSCluster cluster, Path filePath, long
+      fileLen,
       short replicationFactor, int nnIndex)
   throws IOException, InterruptedException, TimeoutException {
     FileSystem fs = cluster.getFileSystem(nnIndex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
new file mode 100644
index 0000000..f50637c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestDiskBalancer {
+
+  @Test
+  public void TestDiskBalancerNameNodeConnectivity() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    final int numDatanodes = 2;
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes).build();
+    try {
+      cluster.waitActive();
+      ClusterConnector nameNodeConnector =
+          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+      DiskBalancerCluster DiskBalancerCluster = new DiskBalancerCluster
+          (nameNodeConnector);
+      DiskBalancerCluster.readClusterInfo();
+      assertEquals(DiskBalancerCluster.getNodes().size(), numDatanodes);
+      DataNode dnNode = cluster.getDataNodes().get(0);
+      DiskBalancerDataNode dbDnNode =
+          DiskBalancerCluster.getNodeByUUID(dnNode.getDatanodeUuid());
+      assertEquals(dnNode.getDatanodeUuid(), dbDnNode.getDataNodeUUID());
+      assertEquals(dnNode.getDatanodeId().getIpAddr(),
+          dbDnNode.getDataNodeIP());
+      assertEquals(dnNode.getDatanodeId().getHostName(),
+          dbDnNode.getDataNodeName());
+      try (FsDatasetSpi.FsVolumeReferences ref = dnNode.getFSDataset()
+          .getFsVolumeReferences()) {
+        assertEquals(ref.size(), dbDnNode.getVolumeCount());
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * This test simulates a real Data node working with DiskBalancer.
+   *
+   * Here is the overview of this test.
+   *
+   * 1. Write a bunch of blocks and move them to one disk to create imbalance.
+   * 2. Rewrite  the capacity of the disks in DiskBalancer Model so that
+   * planner will produce a move plan.
+   * 3. Execute the move plan and wait unitl the plan is done.
+   * 4. Verify the source disk has blocks now.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void TestDiskBalancerEndToEnd() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final int DEFAULT_BLOCK_SIZE = 100;
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    final int numDatanodes = 1;
+    final String fileName = "/tmp.txt";
+    final Path filePath = new Path(fileName);
+    final int blocks = 100;
+    final int blocksSize = 1024;
+    final int fileLen = blocks * blocksSize;
+
+
+    // Write a file and restart the cluster
+    long [] capacities = new long[]{ DEFAULT_BLOCK_SIZE * 2 * fileLen,
+        DEFAULT_BLOCK_SIZE * 2 * fileLen };
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes)
+        .storageCapacities(capacities)
+        .storageTypes(new StorageType[] {StorageType.DISK, StorageType.DISK})
+        .storagesPerDatanode(2)
+        .build();
+    FsVolumeImpl source = null;
+    FsVolumeImpl dest = null;
+    try {
+      cluster.waitActive();
+      Random r = new Random();
+      FileSystem fs = cluster.getFileSystem(0);
+      TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
+          numDatanodes - 1);
+
+      DFSTestUtil.waitReplication(fs, filePath, (short) 1);
+      cluster.restartDataNodes();
+      cluster.waitActive();
+
+      // Get the data node and move all data to one disk.
+      DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1);
+      try (FsDatasetSpi.FsVolumeReferences refs =
+               dnNode.getFSDataset().getFsVolumeReferences()) {
+        source = (FsVolumeImpl) refs.get(0);
+        dest = (FsVolumeImpl) refs.get(1);
+        assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
+        DiskBalancerTestUtil.moveAllDataToDestVolume(
+            dnNode.getFSDataset(), source, dest);
+       assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
+      }
+
+      cluster.restartDataNodes();
+      cluster.waitActive();
+
+      // Start up a disk balancer and read the cluster info.
+      final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1);
+      ClusterConnector nameNodeConnector =
+          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+      DiskBalancerCluster diskBalancerCluster =
+          new DiskBalancerCluster(nameNodeConnector);
+      diskBalancerCluster.readClusterInfo();
+      List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
+
+      // Rewrite the capacity in the model to show that disks need
+      // re-balancing.
+      setVolumeCapacity(diskBalancerCluster, DEFAULT_BLOCK_SIZE * 2 * fileLen, "DISK");
+      // Pick a node to process.
+      nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode
+          .getDatanodeUuid()));
+      diskBalancerCluster.setNodesToProcess(nodesToProcess);
+
+      // Compute a plan.
+      List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f);
+
+      // Now we must have a plan,since the node is imbalanced and we
+      // asked the disk balancer to create a plan.
+      assertTrue(clusterplan.size() == 1);
+
+      NodePlan plan = clusterplan.get(0);
+      plan.setNodeUUID(dnNode.getDatanodeUuid());
+      plan.setTimeStamp(Time.now());
+      String planJson = plan.toJson();
+      String planID = DigestUtils.sha512Hex(planJson);
+      assertNotNull(plan.getVolumeSetPlans());
+      assertTrue(plan.getVolumeSetPlans().size() > 0);
+      plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
+
+
+      // Submit the plan and wait till the execution is done.
+      newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            return newDN.queryDiskBalancerPlan().getResult() ==
+                DiskBalancerWorkStatus.Result.PLAN_DONE;
+          } catch (IOException ex) {
+            return false;
+          }
+        }
+      }, 1000, 100000);
+
+
+      //verify that it worked.
+      dnNode = cluster.getDataNodes().get(numDatanodes - 1);
+      assertEquals(dnNode.queryDiskBalancerPlan().getResult(),
+          DiskBalancerWorkStatus.Result.PLAN_DONE);
+      try (FsDatasetSpi.FsVolumeReferences refs =
+               dnNode.getFSDataset().getFsVolumeReferences()) {
+        source = (FsVolumeImpl) refs.get(0);
+        assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
+      }
+
+
+
+      // Tolerance
+      long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove()
+          * 10) / 100;
+      assertTrue(
+          (DiskBalancerTestUtil.getBlockCount(source) *
+              DEFAULT_BLOCK_SIZE + delta) >=
+              plan.getVolumeSetPlans().get(0).getBytesToMove());
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Sets alll Disks capacity to size specified.
+   * @param cluster   - DiskBalancerCluster
+   * @param size   - new size of the disk
+   */
+  private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
+                                 String diskType) {
+    Preconditions.checkNotNull(cluster);
+    for(DiskBalancerDataNode node : cluster.getNodes()) {
+      for (DiskBalancerVolume vol :
+          node.getVolumeSets().get(diskType).getVolumes()) {
+        vol.setCapacity(size);
+      }
+      node.getVolumeSets().get(diskType).computeVolumeDataDensity();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
index ad18075..77c2aa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
@@ -21,11 +21,9 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
-    .DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
-    .DiskBalancerVolumeSet;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
@@ -143,9 +141,9 @@ public class TestPlanner {
     cluster.readClusterInfo();
     Assert.assertEquals(1, cluster.getNodes().size());
 
-    GreedyPlanner planner = new GreedyPlanner(10.0f, node);
-    NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
-        ());
+    GreedyPlanner planner = new GreedyPlanner(5.0f, node);
+    NodePlan plan = new NodePlan(node.getDataNodeUUID(),
+        node.getDataNodePort());
     planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
 
     // We should have only one planned move from
@@ -184,8 +182,8 @@ public class TestPlanner {
     cluster.readClusterInfo();
     Assert.assertEquals(1, cluster.getNodes().size());
 
-    GreedyPlanner planner = new GreedyPlanner(10.0f, node);
-    NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
+    GreedyPlanner planner = new GreedyPlanner(5.0f, node);
+    NodePlan plan = new NodePlan(node.getDataNodeUUID(), node.getDataNodePort
         ());
     planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
 
@@ -262,11 +260,10 @@ public class TestPlanner {
     assertEquals(2, plan.getVolumeSetPlans().size());
     Step step = plan.getVolumeSetPlans().get(0);
     assertEquals("volume100", step.getSourceVolume().getPath());
-    assertEquals("33.3 G", step.getSizeString(step.getBytesToMove()));
-
+    assertTrue(step.getSizeString(step.getBytesToMove()).matches("33.[2|3|4] G"));
     step = plan.getVolumeSetPlans().get(1);
     assertEquals("volume100", step.getSourceVolume().getPath());
-    assertEquals("33.3 G", step.getSizeString(step.getBytesToMove()));
+    assertTrue(step.getSizeString(step.getBytesToMove()).matches("33.[2|3|4] G"));
   }
 
   @Test
@@ -318,11 +315,12 @@ public class TestPlanner {
 
     Step step = newPlan.getVolumeSetPlans().get(0);
     assertEquals("volume100", step.getSourceVolume().getPath());
-    assertEquals("18.8 G", step.getSizeString(step.getBytesToMove()));
+    assertTrue(step.getSizeString(step.getBytesToMove()).matches("18.[6|7|8] G"));
 
     step = newPlan.getVolumeSetPlans().get(1);
     assertEquals("volume100", step.getSourceVolume().getPath());
-    assertEquals("18.8 G", step.getSizeString(step.getBytesToMove()));
+    assertTrue(step.getSizeString(step.getBytesToMove()).matches("18.[6|7|8] G"));
+
   }
 
   @Test
@@ -364,7 +362,7 @@ public class TestPlanner {
 
       if (step.getDestinationVolume().getPath().equals("volume0-1")) {
         assertEquals("volume100", step.getSourceVolume().getPath());
-        assertEquals("28.6 G",
+        assertEquals("28.5 G",
             step.getSizeString(step.getBytesToMove()));
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org