You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/09/14 02:43:49 UTC

[11/50] [abbrv] hadoop git commit: HDFS-10808. DiskBalancer does not execute multi-steps plan-redux. Contributed by Anu Engineer.

HDFS-10808. DiskBalancer does not execute multi-steps plan-redux. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bee9f57f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bee9f57f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bee9f57f

Branch: refs/heads/YARN-2915
Commit: bee9f57f5ca9f037ade932c6fd01b0dad47a1296
Parents: cba973f
Author: Anu Engineer <ae...@apache.org>
Authored: Fri Sep 9 15:00:39 2016 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri Sep 9 15:00:39 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/DiskBalancer.java      |  31 +-
 .../server/diskbalancer/TestDiskBalancer.java   | 492 ++++++++++++-------
 .../TestDiskBalancerWithMockMover.java          |  17 +-
 3 files changed, 339 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index e9e2e5b..d853ae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -501,14 +501,11 @@ public class DiskBalancer {
       public void run() {
         Thread.currentThread().setName("DiskBalancerThread");
         LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
-                planFile, planID);
-        try {
-          for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
-              workMap.entrySet()) {
-            blockMover.copyBlocks(entry.getKey(), entry.getValue());
-          }
-        } finally {
-          blockMover.setExitFlag();
+            planFile, planID);
+        for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
+            workMap.entrySet()) {
+          blockMover.setRunnable();
+          blockMover.copyBlocks(entry.getKey(), entry.getValue());
         }
       }
     });
@@ -857,8 +854,8 @@ public class DiskBalancer {
 
       if (item.getErrorCount() >= getMaxError(item)) {
         item.setErrMsg("Error count exceeded.");
-        LOG.info("Maximum error count exceeded. Error count: {} Max error:{} "
-            , item.getErrorCount(), item.getMaxDiskErrors());
+        LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ",
+            item.getErrorCount(), item.getMaxDiskErrors());
       }
 
       return null;
@@ -962,7 +959,8 @@ public class DiskBalancer {
               LOG.error("Exceeded the max error count. source {}, dest: {} " +
                       "error count: {}", source.getBasePath(),
                   dest.getBasePath(), item.getErrorCount());
-              break;
+              this.setExitFlag();
+              continue;
             }
 
             // Check for the block tolerance constraint.
@@ -971,7 +969,8 @@ public class DiskBalancer {
                       "blocks.",
                   source.getBasePath(), dest.getBasePath(),
                   item.getBytesCopied(), item.getBlocksCopied());
-              break;
+              this.setExitFlag();
+              continue;
             }
 
             ExtendedBlock block = getNextBlock(poolIters, item);
@@ -979,7 +978,8 @@ public class DiskBalancer {
             if (block == null) {
               LOG.error("No source blocks, exiting the copy. Source: {}, " +
                   "dest:{}", source.getBasePath(), dest.getBasePath());
-              break;
+              this.setExitFlag();
+              continue;
             }
 
             // check if someone told us exit, treat this as an interruption
@@ -987,7 +987,7 @@ public class DiskBalancer {
             // for the thread, since both getNextBlock and moveBlocAcrossVolume
             // can take some time.
             if (!shouldRun()) {
-              break;
+              continue;
             }
 
             long timeUsed;
@@ -1006,7 +1006,8 @@ public class DiskBalancer {
               LOG.error("Destination volume: {} does not have enough space to" +
                   " accommodate a block. Block Size: {} Exiting from" +
                   " copyBlocks.", dest.getBasePath(), block.getNumBytes());
-              break;
+              this.setExitFlag();
+              continue;
             }
 
             LOG.debug("Moved block with size {} from  {} to {}",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
index dc177fd..eb15bdc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
@@ -37,19 +36,18 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
-    .DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
@@ -62,6 +60,7 @@ import static org.junit.Assert.assertTrue;
 public class TestDiskBalancer {
 
   private static final String PLAN_FILE = "/system/current.plan.json";
+  static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);
 
   @Test
   public void testDiskBalancerNameNodeConnectivity() throws Exception {
@@ -110,61 +109,278 @@ public class TestDiskBalancer {
    */
   @Test
   public void testDiskBalancerEndToEnd() throws Exception {
+
     Configuration conf = new HdfsConfiguration();
-    final int defaultBlockSize = 100;
     conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    final int numDatanodes = 1;
-    final String fileName = "/tmp.txt";
-    final Path filePath = new Path(fileName);
-    final int blocks = 100;
-    final int blocksSize = 1024;
-    final int fileLen = blocks * blocksSize;
-
-
-    // Write a file and restart the cluster
-    long[] capacities = new long[]{defaultBlockSize * 2 * fileLen,
-        defaultBlockSize * 2 * fileLen};
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(numDatanodes)
-        .storageCapacities(capacities)
-        .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
-        .storagesPerDatanode(2)
+    final int blockCount = 100;
+    final int blockSize = 1024;
+    final int diskCount = 2;
+    final int dataNodeCount = 1;
+    final int dataNodeIndex = 0;
+    final int sourceDiskIndex = 0;
+
+    MiniDFSCluster cluster = new ClusterBuilder()
+        .setBlockCount(blockCount)
+        .setBlockSize(blockSize)
+        .setDiskCount(diskCount)
+        .setNumDatanodes(dataNodeCount)
+        .setConf(conf)
+        .build();
+    try {
+      DataMover dataMover = new DataMover(cluster, dataNodeIndex,
+          sourceDiskIndex, conf, blockSize, blockCount);
+      dataMover.moveDataToSourceDisk();
+      NodePlan plan = dataMover.generatePlan();
+      dataMover.executePlan(plan);
+      dataMover.verifyPlanExectionDone();
+      dataMover.verifyAllVolumesHaveData();
+      dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBalanceDataBetweenMultiplePairsOfVolumes()
+      throws Exception {
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    final int blockCount = 1000;
+    final int blockSize = 1024;
+
+    // create 3 disks, that means we will have 2 plans
+    // Move Data from disk0->disk1 and disk0->disk2.
+    final int diskCount = 3;
+    final int dataNodeCount = 1;
+    final int dataNodeIndex = 0;
+    final int sourceDiskIndex = 0;
+
+
+    MiniDFSCluster cluster = new ClusterBuilder()
+        .setBlockCount(blockCount)
+        .setBlockSize(blockSize)
+        .setDiskCount(diskCount)
+        .setNumDatanodes(dataNodeCount)
+        .setConf(conf)
         .build();
-    FsVolumeImpl source = null;
-    FsVolumeImpl dest = null;
+
+
     try {
+      DataMover dataMover = new DataMover(cluster, dataNodeIndex,
+          sourceDiskIndex, conf, blockSize, blockCount);
+      dataMover.moveDataToSourceDisk();
+      NodePlan plan = dataMover.generatePlan();
+
+      // 3 disks , The plan should move data both disks,
+      // so we must have 2 plan steps.
+      assertEquals(plan.getVolumeSetPlans().size(), 2);
+
+      dataMover.executePlan(plan);
+      dataMover.verifyPlanExectionDone();
+      dataMover.verifyAllVolumesHaveData();
+      dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Sets alll Disks capacity to size specified.
+   *
+   * @param cluster - DiskBalancerCluster
+   * @param size    - new size of the disk
+   */
+  private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
+                                 String diskType) {
+    Preconditions.checkNotNull(cluster);
+    for (DiskBalancerDataNode node : cluster.getNodes()) {
+      for (DiskBalancerVolume vol :
+          node.getVolumeSets().get(diskType).getVolumes()) {
+        vol.setCapacity(size);
+      }
+      node.getVolumeSets().get(diskType).computeVolumeDataDensity();
+    }
+  }
+
+  /**
+   * Helper class that allows us to create different kinds of MiniDFSClusters
+   * and populate data.
+   */
+  static class ClusterBuilder {
+    private Configuration conf;
+    private int blockSize;
+    private int numDatanodes;
+    private int fileLen;
+    private int blockCount;
+    private int diskCount;
+
+    public ClusterBuilder setConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public ClusterBuilder setBlockSize(int blockSize) {
+      this.blockSize = blockSize;
+      return this;
+    }
+
+    public ClusterBuilder setNumDatanodes(int datanodeCount) {
+      this.numDatanodes = datanodeCount;
+      return this;
+    }
+
+    public ClusterBuilder setBlockCount(int blockCount) {
+      this.blockCount = blockCount;
+      return this;
+    }
+
+    public ClusterBuilder setDiskCount(int diskCount) {
+      this.diskCount = diskCount;
+      return this;
+    }
+
+    private long[] getCapacities(int diskCount, int bSize, int fSize) {
+      Preconditions.checkState(diskCount > 0);
+      long[] capacities = new long[diskCount];
+      for (int x = 0; x < diskCount; x++) {
+        capacities[x] = diskCount * bSize * fSize * 2L;
+      }
+      return capacities;
+    }
+
+    private StorageType[] getStorageTypes(int diskCount) {
+      Preconditions.checkState(diskCount > 0);
+      StorageType[] array = new StorageType[diskCount];
+      for (int x = 0; x < diskCount; x++) {
+        array[x] = StorageType.DISK;
+      }
+      return array;
+    }
+
+    public MiniDFSCluster build() throws IOException, TimeoutException,
+        InterruptedException {
+      Preconditions.checkNotNull(this.conf);
+      Preconditions.checkState(blockSize > 0);
+      Preconditions.checkState(numDatanodes > 0);
+      fileLen = blockCount * blockSize;
+      Preconditions.checkState(fileLen > 0);
+      conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+      conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+      conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+      conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+      final String fileName = "/tmp.txt";
+      Path filePath = new Path(fileName);
+      fileLen = blockCount * blockSize;
+
+
+      // Write a file and restart the cluster
+      MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(numDatanodes)
+          .storageCapacities(getCapacities(diskCount, blockSize, fileLen))
+          .storageTypes(getStorageTypes(diskCount))
+          .storagesPerDatanode(diskCount)
+          .build();
+      generateData(filePath, cluster);
+      cluster.restartDataNodes();
+      cluster.waitActive();
+      return cluster;
+    }
+
+    private void generateData(Path filePath, MiniDFSCluster cluster)
+        throws IOException, InterruptedException, TimeoutException {
       cluster.waitActive();
-      Random r = new Random();
       FileSystem fs = cluster.getFileSystem(0);
       TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
           numDatanodes - 1);
-
       DFSTestUtil.waitReplication(fs, filePath, (short) 1);
       cluster.restartDataNodes();
       cluster.waitActive();
+    }
+  }
 
-      // Get the data node and move all data to one disk.
-      DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1);
-      try (FsDatasetSpi.FsVolumeReferences refs =
-               dnNode.getFSDataset().getFsVolumeReferences()) {
-        source = (FsVolumeImpl) refs.get(0);
-        dest = (FsVolumeImpl) refs.get(1);
-        assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
-        DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
-            source, dest);
-        assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
-      }
+  class DataMover {
+    private final MiniDFSCluster cluster;
+    private final int sourceDiskIndex;
+    private final int dataNodeIndex;
+    private final Configuration conf;
+    private final int blockCount;
+    private final int blockSize;
+    private DataNode node;
+
+    /**
+     * Constructs a DataMover class.
+     *
+     * @param cluster         - MiniDFSCluster.
+     * @param dataNodeIndex   - Datanode to operate against.
+     * @param sourceDiskIndex - source Disk Index.
+     */
+    public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int
+        sourceDiskIndex, Configuration conf, int blockSize, int
+                         blockCount) {
+      this.cluster = cluster;
+      this.dataNodeIndex = dataNodeIndex;
+      this.node = cluster.getDataNodes().get(dataNodeIndex);
+      this.sourceDiskIndex = sourceDiskIndex;
+      this.conf = conf;
+      this.blockCount = blockCount;
+      this.blockSize = blockSize;
+    }
 
+    /**
+     * Moves all data to a source disk to create disk imbalance so we can run a
+     * planner.
+     *
+     * @throws IOException
+     */
+    public void moveDataToSourceDisk() throws IOException {
+      moveAllDataToDestDisk(this.node, sourceDiskIndex);
       cluster.restartDataNodes();
       cluster.waitActive();
 
+    }
+
+    /**
+     * Moves all data in the data node to one disk.
+     *
+     * @param dataNode      - Datanode
+     * @param destDiskindex - Index of the destination disk.
+     */
+    private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex)
+        throws IOException {
+      Preconditions.checkNotNull(dataNode);
+      Preconditions.checkState(destDiskindex >= 0);
+      try (FsDatasetSpi.FsVolumeReferences refs =
+               dataNode.getFSDataset().getFsVolumeReferences()) {
+        if (refs.size() <= destDiskindex) {
+          throw new IllegalArgumentException("Invalid Disk index.");
+        }
+        FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex);
+        for (int x = 0; x < refs.size(); x++) {
+          if (x == destDiskindex) {
+            continue;
+          }
+          FsVolumeImpl source = (FsVolumeImpl) refs.get(x);
+          DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(),
+              source, dest);
+
+        }
+      }
+    }
+
+    /**
+     * Generates a NodePlan for the datanode specified.
+     *
+     * @return NodePlan.
+     */
+    public NodePlan generatePlan() throws Exception {
+
       // Start up a disk balancer and read the cluster info.
-      final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1);
+      node = cluster.getDataNodes().get(dataNodeIndex);
       ClusterConnector nameNodeConnector =
-          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+          ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex)
+              .getUri(), conf);
 
       DiskBalancerCluster diskBalancerCluster =
           new DiskBalancerCluster(nameNodeConnector);
@@ -173,11 +389,11 @@ public class TestDiskBalancer {
 
       // Rewrite the capacity in the model to show that disks need
       // re-balancing.
-      setVolumeCapacity(diskBalancerCluster, defaultBlockSize * 2 * fileLen,
+      setVolumeCapacity(diskBalancerCluster, blockSize * 2L * blockCount,
           "DISK");
       // Pick a node to process.
-      nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode
-          .getDatanodeUuid()));
+      nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
+          node.getDatanodeUuid()));
       diskBalancerCluster.setNodesToProcess(nodesToProcess);
 
       // Compute a plan.
@@ -188,169 +404,91 @@ public class TestDiskBalancer {
       assertTrue(clusterplan.size() == 1);
 
       NodePlan plan = clusterplan.get(0);
-      plan.setNodeUUID(dnNode.getDatanodeUuid());
+      plan.setNodeUUID(node.getDatanodeUuid());
       plan.setTimeStamp(Time.now());
-      String planJson = plan.toJson();
-      String planID = DigestUtils.shaHex(planJson);
+
       assertNotNull(plan.getVolumeSetPlans());
       assertTrue(plan.getVolumeSetPlans().size() > 0);
       plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
+      return plan;
+    }
+
+    /**
+     * Waits for a plan executing to finish.
+     */
+    public void executePlan(NodePlan plan) throws
+        IOException, TimeoutException, InterruptedException {
+
+      node = cluster.getDataNodes().get(dataNodeIndex);
+      String planJson = plan.toJson();
+      String planID = DigestUtils.shaHex(planJson);
 
       // Submit the plan and wait till the execution is done.
-      newDN.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
-      String jmxString = newDN.getDiskBalancerStatus();
+      node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson,
+          false);
+      String jmxString = node.getDiskBalancerStatus();
       assertNotNull(jmxString);
       DiskBalancerWorkStatus status =
           DiskBalancerWorkStatus.parseJson(jmxString);
-      DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan();
+      DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan();
       assertEquals(realStatus.getPlanID(), status.getPlanID());
 
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
         @Override
         public Boolean get() {
           try {
-            return newDN.queryDiskBalancerPlan().getResult() ==
+            return node.queryDiskBalancerPlan().getResult() ==
                 DiskBalancerWorkStatus.Result.PLAN_DONE;
           } catch (IOException ex) {
             return false;
           }
         }
       }, 1000, 100000);
-
-
-      //verify that it worked.
-      dnNode = cluster.getDataNodes().get(numDatanodes - 1);
-      assertEquals(dnNode.queryDiskBalancerPlan().getResult(),
-          DiskBalancerWorkStatus.Result.PLAN_DONE);
-      try (FsDatasetSpi.FsVolumeReferences refs =
-               dnNode.getFSDataset().getFsVolumeReferences()) {
-        source = (FsVolumeImpl) refs.get(0);
-        assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
-      }
-
-
-      // Tolerance
-      long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove()
-          * 10) / 100;
-      assertTrue(
-          (DiskBalancerTestUtil.getBlockCount(source) *
-              defaultBlockSize + delta) >=
-              plan.getVolumeSetPlans().get(0).getBytesToMove());
-
-    } finally {
-      cluster.shutdown();
     }
-  }
-
-  @Test(timeout=60000)
-  public void testBalanceDataBetweenMultiplePairsOfVolumes()
-      throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    final int DEFAULT_BLOCK_SIZE = 2048;
-    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    final int NUM_DATANODES = 1;
-    final long CAP = 512 * 1024;
-    final Path testFile = new Path("/testfile");
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(NUM_DATANODES)
-        .storageCapacities(new long[]{CAP, CAP, CAP, CAP})
-        .storagesPerDatanode(4)
-        .build();
-    try {
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
-      TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0);
-
-      DFSTestUtil.waitReplication(fs, testFile, (short) 1);
-      DataNode dnNode = cluster.getDataNodes().get(0);
-      // Move data out of two volumes to make them empty.
-      try (FsDatasetSpi.FsVolumeReferences refs =
-               dnNode.getFSDataset().getFsVolumeReferences()) {
-        assertEquals(4, refs.size());
-        for (int i = 0; i < refs.size(); i += 2) {
-          FsVolumeImpl source = (FsVolumeImpl) refs.get(i);
-          FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1);
-          assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
-          DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
-              source, dest);
-          assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
-        }
-      }
-
-      cluster.restartDataNodes();
-      cluster.waitActive();
-
-      // Start up a disk balancer and read the cluster info.
-      final DataNode dataNode = cluster.getDataNodes().get(0);
-      ClusterConnector nameNodeConnector =
-          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
-
-      DiskBalancerCluster diskBalancerCluster =
-          new DiskBalancerCluster(nameNodeConnector);
-      diskBalancerCluster.readClusterInfo();
-      List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
-      // Rewrite the capacity in the model to show that disks need
-      // re-balancing.
-      setVolumeCapacity(diskBalancerCluster, CAP, "DISK");
-      nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
-          dataNode.getDatanodeUuid()));
-      diskBalancerCluster.setNodesToProcess(nodesToProcess);
-
-      // Compute a plan.
-      List<NodePlan> clusterPlan = diskBalancerCluster.computePlan(10.0f);
-
-      NodePlan plan = clusterPlan.get(0);
-      assertEquals(2, plan.getVolumeSetPlans().size());
-      plan.setNodeUUID(dnNode.getDatanodeUuid());
-      plan.setTimeStamp(Time.now());
-      String planJson = plan.toJson();
-      String planID = DigestUtils.shaHex(planJson);
 
-      dataNode.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
-
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          try {
-            return dataNode.queryDiskBalancerPlan().getResult() ==
-                DiskBalancerWorkStatus.Result.PLAN_DONE;
-          } catch (IOException ex) {
-            return false;
-          }
-        }
-      }, 1000, 100000);
-      assertEquals(dataNode.queryDiskBalancerPlan().getResult(),
+    /**
+     * Verifies the Plan Execution has been done.
+     */
+    public void verifyPlanExectionDone() throws IOException {
+      node = cluster.getDataNodes().get(dataNodeIndex);
+      assertEquals(node.queryDiskBalancerPlan().getResult(),
           DiskBalancerWorkStatus.Result.PLAN_DONE);
+    }
 
+    /**
+     * Once diskBalancer is run, all volumes mush has some data.
+     */
+    public void verifyAllVolumesHaveData() throws IOException {
+      node = cluster.getDataNodes().get(dataNodeIndex);
       try (FsDatasetSpi.FsVolumeReferences refs =
-               dataNode.getFSDataset().getFsVolumeReferences()) {
-        for (FsVolumeSpi vol : refs) {
-          assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0);
+               node.getFSDataset().getFsVolumeReferences()) {
+        for (FsVolumeSpi volume : refs) {
+          assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
+          LOG.info(refs.toString() + " : Block Count : {}",
+              DiskBalancerTestUtil.getBlockCount(volume));
         }
       }
-    } finally {
-      cluster.shutdown();
     }
-  }
 
-  /**
-   * Sets alll Disks capacity to size specified.
-   *
-   * @param cluster - DiskBalancerCluster
-   * @param size    - new size of the disk
-   */
-  private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
-                                 String diskType) {
-    Preconditions.checkNotNull(cluster);
-    for (DiskBalancerDataNode node : cluster.getNodes()) {
-      for (DiskBalancerVolume vol :
-          node.getVolumeSets().get(diskType).getVolumes()) {
-        vol.setCapacity(size);
+    /**
+     * Verifies that tolerance values are honored correctly.
+     */
+    public void verifyTolerance(NodePlan plan, int planIndex, int
+        sourceDiskIndex, int tolerance) throws IOException {
+      // Tolerance
+      long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove()
+          * tolerance) / 100;
+      FsVolumeImpl volume = null;
+      try (FsDatasetSpi.FsVolumeReferences refs =
+               node.getFSDataset().getFsVolumeReferences()) {
+        volume = (FsVolumeImpl) refs.get(sourceDiskIndex);
+        assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
+
+        assertTrue(
+            (DiskBalancerTestUtil.getBlockCount(volume) *
+                (blockSize + delta)) >=
+                plan.getVolumeSetPlans().get(0).getBytesToMove());
       }
-      node.getVolumeSets().get(diskType).computeVolumeDataDensity();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
index c362f49..794a887 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
@@ -358,14 +358,13 @@ public class TestDiskBalancerWithMockMover {
 
     private AtomicBoolean shouldRun;
     private FsDatasetSpi dataset;
-    private Integer runCount;
+    private int runCount;
     private volatile boolean sleepInCopyBlocks;
     private long delay;
 
     public TestMover(FsDatasetSpi dataset) {
       this.dataset = dataset;
       this.shouldRun = new AtomicBoolean(false);
-      this.runCount = new Integer(0);
     }
 
     public void setSleep() {
@@ -401,7 +400,7 @@ public class TestDiskBalancerWithMockMover {
         if (delay > 0) {
           Thread.sleep(delay);
         }
-        synchronized (runCount) {
+        synchronized (this) {
           if (shouldRun()) {
             runCount++;
           }
@@ -461,9 +460,9 @@ public class TestDiskBalancerWithMockMover {
     }
 
     public int getRunCount() {
-      synchronized (runCount) {
-        LOG.info("Run count : " + runCount.intValue());
-        return runCount.intValue();
+      synchronized (this) {
+        LOG.info("Run count : " + runCount);
+        return runCount;
       }
     }
   }
@@ -510,7 +509,7 @@ public class TestDiskBalancerWithMockMover {
     }
   }
 
-  private class DiskBalancerBuilder {
+  private static class DiskBalancerBuilder {
     private TestMover blockMover;
     private Configuration conf;
     private String nodeID;
@@ -546,7 +545,7 @@ public class TestDiskBalancerWithMockMover {
     }
   }
 
-  private class DiskBalancerClusterBuilder {
+  private static class DiskBalancerClusterBuilder {
     private String jsonFilePath;
     private Configuration conf;
 
@@ -573,7 +572,7 @@ public class TestDiskBalancerWithMockMover {
     }
   }
 
-  private class PlanBuilder {
+  private static class PlanBuilder {
     private String sourcePath;
     private String destPath;
     private String sourceUUID;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org