You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/09/14 02:43:49 UTC
[11/50] [abbrv] hadoop git commit: HDFS-10808. DiskBalancer does not
execute multi-steps plan-redux. Contributed by Anu Engineer.
HDFS-10808. DiskBalancer does not execute multi-steps plan-redux. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bee9f57f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bee9f57f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bee9f57f
Branch: refs/heads/YARN-2915
Commit: bee9f57f5ca9f037ade932c6fd01b0dad47a1296
Parents: cba973f
Author: Anu Engineer <ae...@apache.org>
Authored: Fri Sep 9 15:00:39 2016 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri Sep 9 15:00:39 2016 -0700
----------------------------------------------------------------------
.../hdfs/server/datanode/DiskBalancer.java | 31 +-
.../server/diskbalancer/TestDiskBalancer.java | 492 ++++++++++++-------
.../TestDiskBalancerWithMockMover.java | 17 +-
3 files changed, 339 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index e9e2e5b..d853ae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -501,14 +501,11 @@ public class DiskBalancer {
public void run() {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
- planFile, planID);
- try {
- for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
- workMap.entrySet()) {
- blockMover.copyBlocks(entry.getKey(), entry.getValue());
- }
- } finally {
- blockMover.setExitFlag();
+ planFile, planID);
+ for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
+ workMap.entrySet()) {
+ blockMover.setRunnable();
+ blockMover.copyBlocks(entry.getKey(), entry.getValue());
}
}
});
@@ -857,8 +854,8 @@ public class DiskBalancer {
if (item.getErrorCount() >= getMaxError(item)) {
item.setErrMsg("Error count exceeded.");
- LOG.info("Maximum error count exceeded. Error count: {} Max error:{} "
- , item.getErrorCount(), item.getMaxDiskErrors());
+ LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ",
+ item.getErrorCount(), item.getMaxDiskErrors());
}
return null;
@@ -962,7 +959,8 @@ public class DiskBalancer {
LOG.error("Exceeded the max error count. source {}, dest: {} " +
"error count: {}", source.getBasePath(),
dest.getBasePath(), item.getErrorCount());
- break;
+ this.setExitFlag();
+ continue;
}
// Check for the block tolerance constraint.
@@ -971,7 +969,8 @@ public class DiskBalancer {
"blocks.",
source.getBasePath(), dest.getBasePath(),
item.getBytesCopied(), item.getBlocksCopied());
- break;
+ this.setExitFlag();
+ continue;
}
ExtendedBlock block = getNextBlock(poolIters, item);
@@ -979,7 +978,8 @@ public class DiskBalancer {
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"dest:{}", source.getBasePath(), dest.getBasePath());
- break;
+ this.setExitFlag();
+ continue;
}
// check if someone told us exit, treat this as an interruption
@@ -987,7 +987,7 @@ public class DiskBalancer {
// for the thread, since both getNextBlock and moveBlocAcrossVolume
// can take some time.
if (!shouldRun()) {
- break;
+ continue;
}
long timeUsed;
@@ -1006,7 +1006,8 @@ public class DiskBalancer {
LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" +
" copyBlocks.", dest.getBasePath(), block.getNumBytes());
- break;
+ this.setExitFlag();
+ continue;
}
LOG.debug("Moved block with size {} from {} to {}",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
index dc177fd..eb15bdc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
@@ -37,19 +36,18 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
- .DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
@@ -62,6 +60,7 @@ import static org.junit.Assert.assertTrue;
public class TestDiskBalancer {
private static final String PLAN_FILE = "/system/current.plan.json";
+ static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);
@Test
public void testDiskBalancerNameNodeConnectivity() throws Exception {
@@ -110,61 +109,278 @@ public class TestDiskBalancer {
*/
@Test
public void testDiskBalancerEndToEnd() throws Exception {
+
Configuration conf = new HdfsConfiguration();
- final int defaultBlockSize = 100;
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
- final int numDatanodes = 1;
- final String fileName = "/tmp.txt";
- final Path filePath = new Path(fileName);
- final int blocks = 100;
- final int blocksSize = 1024;
- final int fileLen = blocks * blocksSize;
-
-
- // Write a file and restart the cluster
- long[] capacities = new long[]{defaultBlockSize * 2 * fileLen,
- defaultBlockSize * 2 * fileLen};
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numDatanodes)
- .storageCapacities(capacities)
- .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
- .storagesPerDatanode(2)
+ final int blockCount = 100;
+ final int blockSize = 1024;
+ final int diskCount = 2;
+ final int dataNodeCount = 1;
+ final int dataNodeIndex = 0;
+ final int sourceDiskIndex = 0;
+
+ MiniDFSCluster cluster = new ClusterBuilder()
+ .setBlockCount(blockCount)
+ .setBlockSize(blockSize)
+ .setDiskCount(diskCount)
+ .setNumDatanodes(dataNodeCount)
+ .setConf(conf)
+ .build();
+ try {
+ DataMover dataMover = new DataMover(cluster, dataNodeIndex,
+ sourceDiskIndex, conf, blockSize, blockCount);
+ dataMover.moveDataToSourceDisk();
+ NodePlan plan = dataMover.generatePlan();
+ dataMover.executePlan(plan);
+ dataMover.verifyPlanExectionDone();
+ dataMover.verifyAllVolumesHaveData();
+ dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testBalanceDataBetweenMultiplePairsOfVolumes()
+ throws Exception {
+
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+ final int blockCount = 1000;
+ final int blockSize = 1024;
+
+ // create 3 disks, that means we will have 2 plans
+ // Move Data from disk0->disk1 and disk0->disk2.
+ final int diskCount = 3;
+ final int dataNodeCount = 1;
+ final int dataNodeIndex = 0;
+ final int sourceDiskIndex = 0;
+
+
+ MiniDFSCluster cluster = new ClusterBuilder()
+ .setBlockCount(blockCount)
+ .setBlockSize(blockSize)
+ .setDiskCount(diskCount)
+ .setNumDatanodes(dataNodeCount)
+ .setConf(conf)
.build();
- FsVolumeImpl source = null;
- FsVolumeImpl dest = null;
+
+
try {
+ DataMover dataMover = new DataMover(cluster, dataNodeIndex,
+ sourceDiskIndex, conf, blockSize, blockCount);
+ dataMover.moveDataToSourceDisk();
+ NodePlan plan = dataMover.generatePlan();
+
+ // 3 disks , The plan should move data both disks,
+ // so we must have 2 plan steps.
+ assertEquals(plan.getVolumeSetPlans().size(), 2);
+
+ dataMover.executePlan(plan);
+ dataMover.verifyPlanExectionDone();
+ dataMover.verifyAllVolumesHaveData();
+ dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Sets alll Disks capacity to size specified.
+ *
+ * @param cluster - DiskBalancerCluster
+ * @param size - new size of the disk
+ */
+ private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
+ String diskType) {
+ Preconditions.checkNotNull(cluster);
+ for (DiskBalancerDataNode node : cluster.getNodes()) {
+ for (DiskBalancerVolume vol :
+ node.getVolumeSets().get(diskType).getVolumes()) {
+ vol.setCapacity(size);
+ }
+ node.getVolumeSets().get(diskType).computeVolumeDataDensity();
+ }
+ }
+
+ /**
+ * Helper class that allows us to create different kinds of MiniDFSClusters
+ * and populate data.
+ */
+ static class ClusterBuilder {
+ private Configuration conf;
+ private int blockSize;
+ private int numDatanodes;
+ private int fileLen;
+ private int blockCount;
+ private int diskCount;
+
+ public ClusterBuilder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public ClusterBuilder setBlockSize(int blockSize) {
+ this.blockSize = blockSize;
+ return this;
+ }
+
+ public ClusterBuilder setNumDatanodes(int datanodeCount) {
+ this.numDatanodes = datanodeCount;
+ return this;
+ }
+
+ public ClusterBuilder setBlockCount(int blockCount) {
+ this.blockCount = blockCount;
+ return this;
+ }
+
+ public ClusterBuilder setDiskCount(int diskCount) {
+ this.diskCount = diskCount;
+ return this;
+ }
+
+ private long[] getCapacities(int diskCount, int bSize, int fSize) {
+ Preconditions.checkState(diskCount > 0);
+ long[] capacities = new long[diskCount];
+ for (int x = 0; x < diskCount; x++) {
+ capacities[x] = diskCount * bSize * fSize * 2L;
+ }
+ return capacities;
+ }
+
+ private StorageType[] getStorageTypes(int diskCount) {
+ Preconditions.checkState(diskCount > 0);
+ StorageType[] array = new StorageType[diskCount];
+ for (int x = 0; x < diskCount; x++) {
+ array[x] = StorageType.DISK;
+ }
+ return array;
+ }
+
+ public MiniDFSCluster build() throws IOException, TimeoutException,
+ InterruptedException {
+ Preconditions.checkNotNull(this.conf);
+ Preconditions.checkState(blockSize > 0);
+ Preconditions.checkState(numDatanodes > 0);
+ fileLen = blockCount * blockSize;
+ Preconditions.checkState(fileLen > 0);
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+ final String fileName = "/tmp.txt";
+ Path filePath = new Path(fileName);
+ fileLen = blockCount * blockSize;
+
+
+ // Write a file and restart the cluster
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDatanodes)
+ .storageCapacities(getCapacities(diskCount, blockSize, fileLen))
+ .storageTypes(getStorageTypes(diskCount))
+ .storagesPerDatanode(diskCount)
+ .build();
+ generateData(filePath, cluster);
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ return cluster;
+ }
+
+ private void generateData(Path filePath, MiniDFSCluster cluster)
+ throws IOException, InterruptedException, TimeoutException {
cluster.waitActive();
- Random r = new Random();
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
numDatanodes - 1);
-
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
cluster.restartDataNodes();
cluster.waitActive();
+ }
+ }
- // Get the data node and move all data to one disk.
- DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1);
- try (FsDatasetSpi.FsVolumeReferences refs =
- dnNode.getFSDataset().getFsVolumeReferences()) {
- source = (FsVolumeImpl) refs.get(0);
- dest = (FsVolumeImpl) refs.get(1);
- assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
- DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
- source, dest);
- assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
- }
+ class DataMover {
+ private final MiniDFSCluster cluster;
+ private final int sourceDiskIndex;
+ private final int dataNodeIndex;
+ private final Configuration conf;
+ private final int blockCount;
+ private final int blockSize;
+ private DataNode node;
+
+ /**
+ * Constructs a DataMover class.
+ *
+ * @param cluster - MiniDFSCluster.
+ * @param dataNodeIndex - Datanode to operate against.
+ * @param sourceDiskIndex - source Disk Index.
+ */
+ public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int
+ sourceDiskIndex, Configuration conf, int blockSize, int
+ blockCount) {
+ this.cluster = cluster;
+ this.dataNodeIndex = dataNodeIndex;
+ this.node = cluster.getDataNodes().get(dataNodeIndex);
+ this.sourceDiskIndex = sourceDiskIndex;
+ this.conf = conf;
+ this.blockCount = blockCount;
+ this.blockSize = blockSize;
+ }
+ /**
+ * Moves all data to a source disk to create disk imbalance so we can run a
+ * planner.
+ *
+ * @throws IOException
+ */
+ public void moveDataToSourceDisk() throws IOException {
+ moveAllDataToDestDisk(this.node, sourceDiskIndex);
cluster.restartDataNodes();
cluster.waitActive();
+ }
+
+ /**
+ * Moves all data in the data node to one disk.
+ *
+ * @param dataNode - Datanode
+ * @param destDiskindex - Index of the destination disk.
+ */
+ private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex)
+ throws IOException {
+ Preconditions.checkNotNull(dataNode);
+ Preconditions.checkState(destDiskindex >= 0);
+ try (FsDatasetSpi.FsVolumeReferences refs =
+ dataNode.getFSDataset().getFsVolumeReferences()) {
+ if (refs.size() <= destDiskindex) {
+ throw new IllegalArgumentException("Invalid Disk index.");
+ }
+ FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex);
+ for (int x = 0; x < refs.size(); x++) {
+ if (x == destDiskindex) {
+ continue;
+ }
+ FsVolumeImpl source = (FsVolumeImpl) refs.get(x);
+ DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(),
+ source, dest);
+
+ }
+ }
+ }
+
+ /**
+ * Generates a NodePlan for the datanode specified.
+ *
+ * @return NodePlan.
+ */
+ public NodePlan generatePlan() throws Exception {
+
// Start up a disk balancer and read the cluster info.
- final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1);
+ node = cluster.getDataNodes().get(dataNodeIndex);
ClusterConnector nameNodeConnector =
- ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+ ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex)
+ .getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
@@ -173,11 +389,11 @@ public class TestDiskBalancer {
// Rewrite the capacity in the model to show that disks need
// re-balancing.
- setVolumeCapacity(diskBalancerCluster, defaultBlockSize * 2 * fileLen,
+ setVolumeCapacity(diskBalancerCluster, blockSize * 2L * blockCount,
"DISK");
// Pick a node to process.
- nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode
- .getDatanodeUuid()));
+ nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
+ node.getDatanodeUuid()));
diskBalancerCluster.setNodesToProcess(nodesToProcess);
// Compute a plan.
@@ -188,169 +404,91 @@ public class TestDiskBalancer {
assertTrue(clusterplan.size() == 1);
NodePlan plan = clusterplan.get(0);
- plan.setNodeUUID(dnNode.getDatanodeUuid());
+ plan.setNodeUUID(node.getDatanodeUuid());
plan.setTimeStamp(Time.now());
- String planJson = plan.toJson();
- String planID = DigestUtils.shaHex(planJson);
+
assertNotNull(plan.getVolumeSetPlans());
assertTrue(plan.getVolumeSetPlans().size() > 0);
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
+ return plan;
+ }
+
+ /**
+ * Waits for a plan executing to finish.
+ */
+ public void executePlan(NodePlan plan) throws
+ IOException, TimeoutException, InterruptedException {
+
+ node = cluster.getDataNodes().get(dataNodeIndex);
+ String planJson = plan.toJson();
+ String planID = DigestUtils.shaHex(planJson);
// Submit the plan and wait till the execution is done.
- newDN.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
- String jmxString = newDN.getDiskBalancerStatus();
+ node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson,
+ false);
+ String jmxString = node.getDiskBalancerStatus();
assertNotNull(jmxString);
DiskBalancerWorkStatus status =
DiskBalancerWorkStatus.parseJson(jmxString);
- DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan();
+ DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan();
assertEquals(realStatus.getPlanID(), status.getPlanID());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
- return newDN.queryDiskBalancerPlan().getResult() ==
+ return node.queryDiskBalancerPlan().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
-
-
- //verify that it worked.
- dnNode = cluster.getDataNodes().get(numDatanodes - 1);
- assertEquals(dnNode.queryDiskBalancerPlan().getResult(),
- DiskBalancerWorkStatus.Result.PLAN_DONE);
- try (FsDatasetSpi.FsVolumeReferences refs =
- dnNode.getFSDataset().getFsVolumeReferences()) {
- source = (FsVolumeImpl) refs.get(0);
- assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
- }
-
-
- // Tolerance
- long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove()
- * 10) / 100;
- assertTrue(
- (DiskBalancerTestUtil.getBlockCount(source) *
- defaultBlockSize + delta) >=
- plan.getVolumeSetPlans().get(0).getBytesToMove());
-
- } finally {
- cluster.shutdown();
}
- }
-
- @Test(timeout=60000)
- public void testBalanceDataBetweenMultiplePairsOfVolumes()
- throws Exception {
- Configuration conf = new HdfsConfiguration();
- final int DEFAULT_BLOCK_SIZE = 2048;
- conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
- final int NUM_DATANODES = 1;
- final long CAP = 512 * 1024;
- final Path testFile = new Path("/testfile");
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(NUM_DATANODES)
- .storageCapacities(new long[]{CAP, CAP, CAP, CAP})
- .storagesPerDatanode(4)
- .build();
- try {
- cluster.waitActive();
- DistributedFileSystem fs = cluster.getFileSystem();
- TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0);
-
- DFSTestUtil.waitReplication(fs, testFile, (short) 1);
- DataNode dnNode = cluster.getDataNodes().get(0);
- // Move data out of two volumes to make them empty.
- try (FsDatasetSpi.FsVolumeReferences refs =
- dnNode.getFSDataset().getFsVolumeReferences()) {
- assertEquals(4, refs.size());
- for (int i = 0; i < refs.size(); i += 2) {
- FsVolumeImpl source = (FsVolumeImpl) refs.get(i);
- FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1);
- assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
- DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
- source, dest);
- assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
- }
- }
-
- cluster.restartDataNodes();
- cluster.waitActive();
-
- // Start up a disk balancer and read the cluster info.
- final DataNode dataNode = cluster.getDataNodes().get(0);
- ClusterConnector nameNodeConnector =
- ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
-
- DiskBalancerCluster diskBalancerCluster =
- new DiskBalancerCluster(nameNodeConnector);
- diskBalancerCluster.readClusterInfo();
- List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
- // Rewrite the capacity in the model to show that disks need
- // re-balancing.
- setVolumeCapacity(diskBalancerCluster, CAP, "DISK");
- nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
- dataNode.getDatanodeUuid()));
- diskBalancerCluster.setNodesToProcess(nodesToProcess);
-
- // Compute a plan.
- List<NodePlan> clusterPlan = diskBalancerCluster.computePlan(10.0f);
-
- NodePlan plan = clusterPlan.get(0);
- assertEquals(2, plan.getVolumeSetPlans().size());
- plan.setNodeUUID(dnNode.getDatanodeUuid());
- plan.setTimeStamp(Time.now());
- String planJson = plan.toJson();
- String planID = DigestUtils.shaHex(planJson);
- dataNode.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
-
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- try {
- return dataNode.queryDiskBalancerPlan().getResult() ==
- DiskBalancerWorkStatus.Result.PLAN_DONE;
- } catch (IOException ex) {
- return false;
- }
- }
- }, 1000, 100000);
- assertEquals(dataNode.queryDiskBalancerPlan().getResult(),
+ /**
+ * Verifies the Plan Execution has been done.
+ */
+ public void verifyPlanExectionDone() throws IOException {
+ node = cluster.getDataNodes().get(dataNodeIndex);
+ assertEquals(node.queryDiskBalancerPlan().getResult(),
DiskBalancerWorkStatus.Result.PLAN_DONE);
+ }
+ /**
+ * Once diskBalancer is run, all volumes mush has some data.
+ */
+ public void verifyAllVolumesHaveData() throws IOException {
+ node = cluster.getDataNodes().get(dataNodeIndex);
try (FsDatasetSpi.FsVolumeReferences refs =
- dataNode.getFSDataset().getFsVolumeReferences()) {
- for (FsVolumeSpi vol : refs) {
- assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0);
+ node.getFSDataset().getFsVolumeReferences()) {
+ for (FsVolumeSpi volume : refs) {
+ assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
+ LOG.info(refs.toString() + " : Block Count : {}",
+ DiskBalancerTestUtil.getBlockCount(volume));
}
}
- } finally {
- cluster.shutdown();
}
- }
- /**
- * Sets alll Disks capacity to size specified.
- *
- * @param cluster - DiskBalancerCluster
- * @param size - new size of the disk
- */
- private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
- String diskType) {
- Preconditions.checkNotNull(cluster);
- for (DiskBalancerDataNode node : cluster.getNodes()) {
- for (DiskBalancerVolume vol :
- node.getVolumeSets().get(diskType).getVolumes()) {
- vol.setCapacity(size);
+ /**
+ * Verifies that tolerance values are honored correctly.
+ */
+ public void verifyTolerance(NodePlan plan, int planIndex, int
+ sourceDiskIndex, int tolerance) throws IOException {
+ // Tolerance
+ long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove()
+ * tolerance) / 100;
+ FsVolumeImpl volume = null;
+ try (FsDatasetSpi.FsVolumeReferences refs =
+ node.getFSDataset().getFsVolumeReferences()) {
+ volume = (FsVolumeImpl) refs.get(sourceDiskIndex);
+ assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
+
+ assertTrue(
+ (DiskBalancerTestUtil.getBlockCount(volume) *
+ (blockSize + delta)) >=
+ plan.getVolumeSetPlans().get(0).getBytesToMove());
}
- node.getVolumeSets().get(diskType).computeVolumeDataDensity();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
index c362f49..794a887 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
@@ -358,14 +358,13 @@ public class TestDiskBalancerWithMockMover {
private AtomicBoolean shouldRun;
private FsDatasetSpi dataset;
- private Integer runCount;
+ private int runCount;
private volatile boolean sleepInCopyBlocks;
private long delay;
public TestMover(FsDatasetSpi dataset) {
this.dataset = dataset;
this.shouldRun = new AtomicBoolean(false);
- this.runCount = new Integer(0);
}
public void setSleep() {
@@ -401,7 +400,7 @@ public class TestDiskBalancerWithMockMover {
if (delay > 0) {
Thread.sleep(delay);
}
- synchronized (runCount) {
+ synchronized (this) {
if (shouldRun()) {
runCount++;
}
@@ -461,9 +460,9 @@ public class TestDiskBalancerWithMockMover {
}
public int getRunCount() {
- synchronized (runCount) {
- LOG.info("Run count : " + runCount.intValue());
- return runCount.intValue();
+ synchronized (this) {
+ LOG.info("Run count : " + runCount);
+ return runCount;
}
}
}
@@ -510,7 +509,7 @@ public class TestDiskBalancerWithMockMover {
}
}
- private class DiskBalancerBuilder {
+ private static class DiskBalancerBuilder {
private TestMover blockMover;
private Configuration conf;
private String nodeID;
@@ -546,7 +545,7 @@ public class TestDiskBalancerWithMockMover {
}
}
- private class DiskBalancerClusterBuilder {
+ private static class DiskBalancerClusterBuilder {
private String jsonFilePath;
private Configuration conf;
@@ -573,7 +572,7 @@ public class TestDiskBalancerWithMockMover {
}
}
- private class PlanBuilder {
+ private static class PlanBuilder {
private String sourcePath;
private String destPath;
private String sourceUUID;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org