You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 06:05:53 UTC
[22/49] hadoop git commit: HDFS-9543. DiskBalancer: Add Data mover.
Contributed by Anu Engineer.
HDFS-9543. DiskBalancer: Add Data mover. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1594b472
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1594b472
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1594b472
Branch: refs/heads/trunk
Commit: 1594b472bb9df7537dbc001411c99058cc11ba41
Parents: 7820737
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Apr 28 16:12:04 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:20:24 2016 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/server/datanode/DataNode.java | 2 -
.../hdfs/server/datanode/DiskBalancer.java | 365 +++++++++++++++++--
.../datamodel/DiskBalancerDataNode.java | 13 +-
.../datamodel/DiskBalancerVolume.java | 6 +-
.../datamodel/DiskBalancerVolumeSet.java | 34 +-
.../server/diskbalancer/planner/MoveStep.java | 14 +-
.../hdfs/server/diskbalancer/planner/Step.java | 20 +-
.../hdfs/server/balancer/TestBalancer.java | 3 +-
.../server/diskbalancer/TestDiskBalancer.java | 247 +++++++++++++
.../hdfs/server/diskbalancer/TestPlanner.java | 28 +-
10 files changed, 666 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 776da3a..d6be2e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3324,8 +3324,6 @@ public class DataNode extends ReconfigurableBase
* @param planID - Hash value of the plan.
* @param planVersion - Plan version, reserved for future use. We have only
* version 1 now.
- * @param bandwidth - Max disk bandwidth to use, 0 means use value defined
- * in the configration.
* @param plan - Actual plan
* @throws IOException
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index b62a4fc..7f768ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -23,7 +23,9 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.DiskBalancerWorkEntry;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
+ .DiskBalancerWorkEntry;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -48,18 +52,21 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
/**
* Worker class for Disk Balancer.
- * <p/>
+ * <p>
* Here is the high level logic executed by this class. Users can submit disk
* balancing plans using submitPlan calls. After a set of sanity checks the plan
* is admitted and put into workMap.
- * <p/>
+ * <p>
* The executePlan launches a thread that picks up work from workMap and hands
* it over to the BlockMover#copyBlocks function.
- * <p/>
+ * <p>
* Constraints :
- * <p/>
+ * <p>
* Only one plan can be executing in a datanode at any given time. This is
* ensured by checking the future handle of the worker thread in submitPlan.
*/
@@ -127,11 +134,12 @@ public class DiskBalancer {
* Shutdown the executor.
*/
private void shutdownExecutor() {
+ final int secondsTowait = 10;
scheduler.shutdown();
try {
- if(!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+ if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
- if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+ if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
LOG.error("Disk Balancer : Scheduler did not terminate.");
}
}
@@ -207,6 +215,7 @@ public class DiskBalancer {
/**
* Cancels a running plan.
+ *
* @param planID - Hash of the plan to cancel.
* @throws DiskBalancerException
*/
@@ -297,7 +306,7 @@ public class DiskBalancer {
* @throws DiskBalancerException
*/
private NodePlan verifyPlan(String planID, long planVersion, String plan,
- boolean force) throws DiskBalancerException {
+ boolean force) throws DiskBalancerException {
Preconditions.checkState(lock.isHeldByCurrentThread());
verifyPlanVersion(planVersion);
@@ -372,8 +381,8 @@ public class DiskBalancer {
(TimeUnit.HOURS.toMillis(
DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
String hourString = "Plan was generated more than " +
- Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
- + " hours ago.";
+ Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
+ + " hours ago.";
LOG.error("Disk Balancer - " + hourString);
throw new DiskBalancerException(hourString,
DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
@@ -484,14 +493,14 @@ public class DiskBalancer {
/**
* Insert work items to work map.
*
- * @param source - Source vol
- * @param dest - destination volume
- * @param step - Move Step
+ * @param source - Source vol
+ * @param dest - destination volume
+ * @param step - Move Step
*/
private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
Step step) throws DiskBalancerException {
- if(source.getStorageID().equals(dest.getStorageID())) {
+ if (source.getStorageID().equals(dest.getStorageID())) {
LOG.info("Disk Balancer - source & destination volumes are same.");
throw new DiskBalancerException("source and destination volumes are " +
"same.", DiskBalancerException.Result.INVALID_MOVE);
@@ -604,13 +613,15 @@ public class DiskBalancer {
/**
* Actual DataMover class for DiskBalancer.
- * <p/>
+ * <p>
*/
public static class DiskBalancerMover implements BlockMover {
private final FsDatasetSpi dataset;
private long diskBandwidth;
private long blockTolerance;
private long maxDiskErrors;
+ private int poolIndex;
+ private AtomicBoolean shouldRun;
/**
* Constructs diskBalancerMover.
@@ -620,6 +631,7 @@ public class DiskBalancer {
*/
public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
this.dataset = dataset;
+ shouldRun = new AtomicBoolean(false);
this.diskBandwidth = conf.getLong(
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
@@ -659,30 +671,333 @@ public class DiskBalancer {
}
/**
- * Copies blocks from a set of volumes.
- *
- * @param pair - Source and Destination Volumes.
- * @param item - Number of bytes to move from volumes.
+ * Sets Diskmover copyblocks into runnable state.
*/
@Override
- public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
-
+ public void setRunnable() {
+ this.shouldRun.set(true);
}
/**
- * Begin the actual copy operations. This is useful in testing.
+ * Signals copy block to exit.
*/
@Override
- public void setRunnable() {
+ public void setExitFlag() {
+ this.shouldRun.set(false);
+ }
+ /**
+ * Returns the shouldRun boolean flag.
+ */
+ public boolean shouldRun() {
+ return this.shouldRun.get();
}
/**
- * Tells copyBlocks to exit from the copy routine.
+ * Checks if a given block is less than needed size to meet our goal.
+ *
+ * @param blockSize - block len
+ * @param item - Work item
+ * @return true if this block meets our criteria, false otherwise.
+ */
+ private boolean isLessThanNeeded(long blockSize,
+ DiskBalancerWorkItem item) {
+ long bytesToCopy = item.getBytesToCopy() - item.getBytesCopied();
+ bytesToCopy = bytesToCopy +
+ ((bytesToCopy * getBlockTolerancePercentage(item)) / 100);
+ return (blockSize <= bytesToCopy) ? true : false;
+ }
+
+ /**
+ * Returns the default block tolerance if the plan does not have value of
+ * tolerance specified.
+ *
+ * @param item - DiskBalancerWorkItem
+ * @return long
+ */
+ private long getBlockTolerancePercentage(DiskBalancerWorkItem item) {
+ return item.getTolerancePercent() <= 0 ? this.blockTolerance :
+ item.getTolerancePercent();
+ }
+
+ /**
+ * Inflates bytesCopied and returns true or false. This allows us to stop
+ * copying if we have reached close enough.
+ *
+ * @param item DiskBalancerWorkItem
+ * @return -- false if we need to copy more, true if we are done
+ */
+ private boolean isCloseEnough(DiskBalancerWorkItem item) {
+ long temp = item.getBytesCopied() +
+ ((item.getBytesCopied() * getBlockTolerancePercentage(item)) / 100);
+ return (item.getBytesToCopy() >= temp) ? false : true;
+ }
+
+ /**
+ * Returns disk bandwidth associated with this plan, if none is specified
+ * returns the global default.
+ *
+ * @param item DiskBalancerWorkItem.
+ * @return MB/s - long
+ */
+ private long getDiskBandwidth(DiskBalancerWorkItem item) {
+ return item.getBandwidth() <= 0 ? this.diskBandwidth : item
+ .getBandwidth();
+ }
+
+ /**
+ * Computes sleep delay needed based on the block that just got copied. we
+ * copy using a burst mode, that is we let the copy proceed in full
+ * throttle. Once a copy is done, we compute how many bytes have been
+ * transferred and try to average it over the user specified bandwidth. In
+ * other words, This code implements a poor man's token bucket algorithm for
+ * traffic shaping.
+ *
+ * @param bytesCopied - byteCopied.
+ * @param timeUsed in milliseconds
+ * @param item DiskBalancerWorkItem
+ * @return sleep delay in Milliseconds.
+ */
+ private long computeDelay(long bytesCopied, long timeUsed,
+ DiskBalancerWorkItem item) {
+
+ // we had an overflow, ignore this reading and continue.
+ if (timeUsed == 0) {
+ return 0;
+ }
+ final int megaByte = 1024 * 1024;
+ long bytesInMB = bytesCopied / megaByte;
+ long lastThroughput = bytesInMB / SECONDS.convert(timeUsed,
+ TimeUnit.MILLISECONDS);
+ long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput;
+ return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Returns maximum errors to tolerate for the specific plan or the default.
+ *
+ * @param item - DiskBalancerWorkItem
+ * @return maximum error counts to tolerate.
+ */
+ private long getMaxError(DiskBalancerWorkItem item) {
+ return item.getMaxDiskErrors() <= 0 ? this.maxDiskErrors :
+ item.getMaxDiskErrors();
+ }
+
+ /**
+ * Gets the next block that we can copy, returns null if we cannot find a
+ * block that fits our parameters or if have run out of blocks.
+ *
+ * @param iter Block Iter
+ * @param item - Work item
+ * @return Extended block or null if no copyable block is found.
+ */
+ private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter,
+ DiskBalancerWorkItem item) {
+ while (!iter.atEnd() && item.getErrorCount() < getMaxError(item)) {
+ try {
+ ExtendedBlock block = iter.nextBlock();
+
+ // A valid block is a finalized block, we iterate until we get
+ // finalized blocks
+ if (!this.dataset.isValidBlock(block)) {
+ continue;
+ }
+
+ // We don't look for the best, we just do first fit
+ if (isLessThanNeeded(block.getNumBytes(), item)) {
+ return block;
+ }
+
+ } catch (IOException e) {
+ item.incErrorCount();
+ }
+ }
+
+ if (item.getErrorCount() >= getMaxError(item)) {
+ item.setErrMsg("Error count exceeded.");
+ LOG.info("Maximum error count exceeded. Error count: {} Max error:{} "
+ , item.getErrorCount(), item.getMaxDiskErrors());
+ }
+
+ return null;
+ }
+
+ /**
+ * Opens all Block pools on a given volume.
+ *
+ * @param source Source
+ * @param poolIters List of PoolIters to maintain.
+ */
+ private void openPoolIters(FsVolumeSpi source, List<FsVolumeSpi
+ .BlockIterator> poolIters) {
+ Preconditions.checkNotNull(source);
+ Preconditions.checkNotNull(poolIters);
+
+ for (String blockPoolID : source.getBlockPoolList()) {
+ poolIters.add(source.newBlockIterator(blockPoolID,
+ "DiskBalancerSource"));
+ }
+ }
+
+ /**
+ * Returns the next block that we copy from all the block pools. This
+ * function looks across all block pools to find the next block to copy.
+ *
+ * @param poolIters - List of BlockIterators
+ * @return ExtendedBlock.
+ */
+ ExtendedBlock getNextBlock(List<FsVolumeSpi.BlockIterator> poolIters,
+ DiskBalancerWorkItem item) {
+ Preconditions.checkNotNull(poolIters);
+ int currentCount = 0;
+ ExtendedBlock block = null;
+ while (block == null && currentCount < poolIters.size()) {
+ currentCount++;
+ poolIndex = poolIndex++ % poolIters.size();
+ FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(poolIndex);
+ block = getBlockToCopy(currentPoolIter, item);
+ }
+
+ if (block == null) {
+ try {
+ item.setErrMsg("No source blocks found to move.");
+ LOG.error("No movable source blocks found. {}", item.toJson());
+ } catch (IOException e) {
+ LOG.error("Unable to get json from Item.");
+ }
+ }
+ return block;
+ }
+
+ /**
+ * Close all Pool Iters.
+ *
+ * @param poolIters List of BlockIters
+ */
+ private void closePoolIters(List<FsVolumeSpi.BlockIterator> poolIters) {
+ Preconditions.checkNotNull(poolIters);
+ for (FsVolumeSpi.BlockIterator iter : poolIters) {
+ try {
+ iter.close();
+ } catch (IOException ex) {
+ LOG.error("Error closing a block pool iter. ex: {}", ex);
+ }
+ }
+ }
+
+ /**
+ * Copies blocks from a set of volumes.
+ *
+ * @param pair - Source and Destination Volumes.
+ * @param item - Number of bytes to move from volumes.
*/
@Override
- public void setExitFlag() {
+ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
+ FsVolumeSpi source = pair.getSource();
+ FsVolumeSpi dest = pair.getDest();
+ List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
+ if (source.isTransientStorage() || dest.isTransientStorage()) {
+ return;
+ }
+
+ try {
+ openPoolIters(source, poolIters);
+ if (poolIters.size() == 0) {
+ LOG.error("No block pools found on volume. volume : {}. Exiting.",
+ source.getBasePath());
+ return;
+ }
+
+ while (shouldRun()) {
+ try {
+
+ // Check for the max error count constraint.
+ if (item.getErrorCount() > getMaxError(item)) {
+ LOG.error("Exceeded the max error count. source {}, dest: {} " +
+ "error count: {}", source.getBasePath(),
+ dest.getBasePath(), item.getErrorCount());
+ this.setExitFlag();
+ continue;
+ }
+
+ // Check for the block tolerance constraint.
+ if (isCloseEnough(item)) {
+ LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
+ "blocks.",
+ source.getBasePath(), dest.getBasePath(),
+ item.getBytesCopied(), item.getBlocksCopied());
+ this.setExitFlag();
+ continue;
+ }
+
+ ExtendedBlock block = getNextBlock(poolIters, item);
+ // we are not able to find any blocks to copy.
+ if (block == null) {
+ this.setExitFlag();
+ LOG.error("No source blocks, exiting the copy. Source: {}, " +
+ "dest:{}", source.getBasePath(), dest.getBasePath());
+ continue;
+ }
+
+ // check if someone told us exit, treat this as an interruption
+ // point
+ // for the thread, since both getNextBlock and moveBlocAcrossVolume
+ // can take some time.
+ if (!shouldRun()) {
+ continue;
+ }
+
+ long timeUsed;
+ // There is a race condition here, but we will get an IOException
+ // if dest has no space, which we handle anyway.
+ if (dest.getAvailable() > item.getBytesToCopy()) {
+ long begin = System.nanoTime();
+ this.dataset.moveBlockAcrossVolumes(block, dest);
+ long now = System.nanoTime();
+ timeUsed = (now - begin) > 0 ? now - begin : 0;
+ } else {
+
+ // Technically it is possible for us to find a smaller block and
+ // make another copy, but opting for the safer choice of just
+ // exiting here.
+ LOG.error("Destination volume: {} does not have enough space to" +
+ " accommodate a block. Block Size: {} Exiting from" +
+ " copyBlocks.", dest.getBasePath(), block.getNumBytes());
+ this.setExitFlag();
+ continue;
+ }
+
+ LOG.debug("Moved block with size {} from {} to {}",
+ block.getNumBytes(), source.getBasePath(),
+ dest.getBasePath());
+
+ item.incCopiedSoFar(block.getNumBytes());
+ item.incBlocksCopied();
+
+ // Check for the max throughput constraint.
+ // We sleep here to keep the promise that we will not
+ // copy more than Max MB/sec. we sleep enough time
+ // to make sure that our promise is good on average.
+ // Because we sleep, if a shutdown or cancel call comes in
+ // we exit via Thread Interrupted exception.
+ Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
+
+ } catch (IOException ex) {
+ LOG.error("Exception while trying to copy blocks. error: {}", ex);
+ item.incErrorCount();
+ } catch (InterruptedException e) {
+ LOG.error("Copy Block Thread interrupted, exiting the copy.");
+ Thread.currentThread().interrupt();
+ item.incErrorCount();
+ this.setExitFlag();
+ }
+ }
+ } finally {
+ // Close all Iters.
+ closePoolIters(poolIters);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
index 87030db..f70a983 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
@@ -28,7 +28,7 @@ import java.util.Map;
* between a set of Nodes.
*/
public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
- private float nodeDataDensity;
+ private double nodeDataDensity;
private Map<String, DiskBalancerVolumeSet> volumeSets;
private String dataNodeUUID;
private String dataNodeIP;
@@ -159,17 +159,17 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
public int compareTo(DiskBalancerDataNode that) {
Preconditions.checkNotNull(that);
- if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+ if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
< 0) {
return -1;
}
- if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+ if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
== 0) {
return 0;
}
- if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+ if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
> 0) {
return 1;
}
@@ -190,7 +190,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
*
* @return float
*/
- public float getNodeDataDensity() {
+ public double getNodeDataDensity() {
return nodeDataDensity;
}
@@ -201,7 +201,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
* spread across a set of volumes inside the node.
*/
public void computeNodeDensity() {
- float sum = 0;
+ double sum = 0;
int volcount = 0;
for (DiskBalancerVolumeSet vset : volumeSets.values()) {
for (DiskBalancerVolume vol : vset.getVolumes()) {
@@ -249,6 +249,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
vSet = volumeSets.get(volumeSetKey);
} else {
vSet = new DiskBalancerVolumeSet(volume.isTransient());
+ vSet.setStorageType(volumeSetKey);
volumeSets.put(volumeSetKey, vSet);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
index 24e891f..2a39609 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
@@ -38,7 +38,7 @@ public class DiskBalancerVolume {
private String uuid;
private boolean failed;
private boolean isTransient;
- private float volumeDataDensity;
+ private double volumeDataDensity;
private boolean skip = false;
private boolean isReadOnly;
@@ -69,7 +69,7 @@ public class DiskBalancerVolume {
*
* @return float.
*/
- public float getVolumeDataDensity() {
+ public double getVolumeDataDensity() {
return volumeDataDensity;
}
@@ -78,7 +78,7 @@ public class DiskBalancerVolume {
*
* @param volDataDensity - density
*/
- public void setVolumeDataDensity(float volDataDensity) {
+ public void setVolumeDataDensity(double volDataDensity) {
this.volumeDataDensity = volDataDensity;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
index 2faf249..70d7536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
@@ -53,7 +53,7 @@ public class DiskBalancerVolumeSet {
private String storageType;
private String setID;
- private float idealUsed;
+ private double idealUsed;
/**
@@ -142,19 +142,32 @@ public class DiskBalancerVolumeSet {
}
if (totalCapacity != 0) {
- this.idealUsed = totalUsed / (float) totalCapacity;
+ this.idealUsed = truncateDecimals(totalUsed /
+ (double) totalCapacity);
}
for (DiskBalancerVolume volume : volumes) {
if (!volume.isFailed() && !volume.isSkip()) {
- float dfsUsedRatio =
- volume.getUsed() / (float) volume.computeEffectiveCapacity();
+ double dfsUsedRatio =
+ truncateDecimals(volume.getUsed() /
+ (double) volume.computeEffectiveCapacity());
+
volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio);
sortedQueue.add(volume);
}
}
}
+ /**
+ * Truncate to 4 digits since uncontrolled precision is some times
+ * counter intitive to what users expect.
+ * @param value - double.
+ * @return double.
+ */
+ private double truncateDecimals(double value) {
+ final int multiplier = 10000;
+ return (double) ((long) (value * multiplier)) / multiplier;
+ }
private void skipMisConfiguredVolume(DiskBalancerVolume volume) {
//probably points to some sort of mis-configuration. Log this and skip
// processing this volume.
@@ -255,7 +268,7 @@ public class DiskBalancerVolumeSet {
* @return true if balancing is needed false otherwise.
*/
public boolean isBalancingNeeded(float thresholdPercentage) {
- float threshold = thresholdPercentage / 100.0f;
+ double threshold = thresholdPercentage / 100.0d;
if(volumes == null || volumes.size() <= 1) {
// there is nothing we can do with a single volume.
@@ -265,7 +278,10 @@ public class DiskBalancerVolumeSet {
for (DiskBalancerVolume vol : volumes) {
boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip();
- if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) {
+ Double absDensity =
+ truncateDecimals(Math.abs(vol.getVolumeDataDensity()));
+
+ if ((absDensity > threshold) && notSkip) {
return true;
}
}
@@ -306,7 +322,7 @@ public class DiskBalancerVolumeSet {
*/
@JsonIgnore
- public float getIdealUsed() {
+ public double getIdealUsed() {
return this.idealUsed;
}
@@ -319,8 +335,8 @@ public class DiskBalancerVolumeSet {
*/
@Override
public int compare(DiskBalancerVolume first, DiskBalancerVolume second) {
- return Float
- .compare(second.getVolumeDataDensity(), first.getVolumeDataDensity());
+ return Double.compare(second.getVolumeDataDensity(),
+ first.getVolumeDataDensity());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
index 9a493a5..b5f68fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java
@@ -38,7 +38,7 @@ import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
public class MoveStep implements Step {
private DiskBalancerVolume sourceVolume;
private DiskBalancerVolume destinationVolume;
- private float idealStorage;
+ private double idealStorage;
private long bytesToMove;
private String volumeSetID;
@@ -55,7 +55,7 @@ public class MoveStep implements Step {
* @param bytesToMove - number of bytes to move
* @param volumeSetID - a diskBalancer generated id.
*/
- public MoveStep(DiskBalancerVolume sourceVolume, float idealStorage,
+ public MoveStep(DiskBalancerVolume sourceVolume, double idealStorage,
DiskBalancerVolume destinationVolume, long bytesToMove,
String volumeSetID) {
this.destinationVolume = destinationVolume;
@@ -98,7 +98,7 @@ public class MoveStep implements Step {
* @return float
*/
@Override
- public float getIdealStorage() {
+ public double getIdealStorage() {
return idealStorage;
}
@@ -146,7 +146,7 @@ public class MoveStep implements Step {
*
* @param idealStorage - ideal Storage
*/
- public void setIdealStorage(float idealStorage) {
+ public void setIdealStorage(double idealStorage) {
this.idealStorage = idealStorage;
}
@@ -199,6 +199,7 @@ public class MoveStep implements Step {
* move operation is aborted.
* @return long.
*/
+ @Override
public long getMaxDiskErrors() {
return maxDiskErrors;
}
@@ -208,6 +209,7 @@ public class MoveStep implements Step {
* step is aborted.
* @param maxDiskErrors - long
*/
+ @Override
public void setMaxDiskErrors(long maxDiskErrors) {
this.maxDiskErrors = maxDiskErrors;
}
@@ -223,6 +225,7 @@ public class MoveStep implements Step {
*
* @return tolerance percentage.
*/
+ @Override
public long getTolerancePercent() {
return tolerancePercent;
}
@@ -231,6 +234,7 @@ public class MoveStep implements Step {
* Sets the tolerance percentage.
* @param tolerancePercent - long
*/
+ @Override
public void setTolerancePercent(long tolerancePercent) {
this.tolerancePercent = tolerancePercent;
}
@@ -241,6 +245,7 @@ public class MoveStep implements Step {
* datanode while data node is in use.
* @return long.
*/
+ @Override
public long getBandwidth() {
return bandwidth;
}
@@ -250,6 +255,7 @@ public class MoveStep implements Step {
* @param bandwidth - Long, MB / Sec of data to be moved between
* source and destinatin volume.
*/
+ @Override
public void setBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
index f13909f..8f69653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java
@@ -42,7 +42,7 @@ public interface Step {
*
* @return idealStorage
*/
- float getIdealStorage();
+ double getIdealStorage();
/**
* Gets Source Volume.
@@ -87,5 +87,23 @@ public interface Step {
*/
long getBandwidth();
+ /**
+ * Sets Tolerance percent on a specific step.
+ * @param tolerancePercent - tolerance in percentage.
+ */
+ void setTolerancePercent(long tolerancePercent);
+
+ /**
+ * Set Bandwidth on a specific step.
+ * @param bandwidth - in MB/s
+ */
+ void setBandwidth(long bandwidth);
+
+ /**
+ * Set maximum errors to tolerate before disk balancer step fails.
+ * @param maxDiskErrors - error count.
+ */
+ void setMaxDiskErrors(long maxDiskErrors);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 738cfe6..1cfd488 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -264,7 +264,8 @@ public class TestBalancer {
}
/* create a file with a length of <code>fileLen</code> */
- static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
+ public static void createFile(MiniDFSCluster cluster, Path filePath, long
+ fileLen,
short replicationFactor, int nnIndex)
throws IOException, InterruptedException, TimeoutException {
FileSystem fs = cluster.getFileSystem(nnIndex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
new file mode 100644
index 0000000..f50637c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestDiskBalancer {
+
+ @Test
+ public void TestDiskBalancerNameNodeConnectivity() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+ final int numDatanodes = 2;
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDatanodes).build();
+ try {
+ cluster.waitActive();
+ ClusterConnector nameNodeConnector =
+ ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+ DiskBalancerCluster DiskBalancerCluster = new DiskBalancerCluster
+ (nameNodeConnector);
+ DiskBalancerCluster.readClusterInfo();
+ assertEquals(DiskBalancerCluster.getNodes().size(), numDatanodes);
+ DataNode dnNode = cluster.getDataNodes().get(0);
+ DiskBalancerDataNode dbDnNode =
+ DiskBalancerCluster.getNodeByUUID(dnNode.getDatanodeUuid());
+ assertEquals(dnNode.getDatanodeUuid(), dbDnNode.getDataNodeUUID());
+ assertEquals(dnNode.getDatanodeId().getIpAddr(),
+ dbDnNode.getDataNodeIP());
+ assertEquals(dnNode.getDatanodeId().getHostName(),
+ dbDnNode.getDataNodeName());
+ try (FsDatasetSpi.FsVolumeReferences ref = dnNode.getFSDataset()
+ .getFsVolumeReferences()) {
+ assertEquals(ref.size(), dbDnNode.getVolumeCount());
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * This test simulates a real Data node working with DiskBalancer.
+ *
+ * Here is the overview of this test.
+ *
+ * 1. Write a bunch of blocks and move them to one disk to create imbalance.
+ * 2. Rewrite the capacity of the disks in DiskBalancer Model so that
+ * planner will produce a move plan.
+ * 3. Execute the move plan and wait unitl the plan is done.
+ * 4. Verify the source disk has blocks now.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void TestDiskBalancerEndToEnd() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ final int DEFAULT_BLOCK_SIZE = 100;
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ final int numDatanodes = 1;
+ final String fileName = "/tmp.txt";
+ final Path filePath = new Path(fileName);
+ final int blocks = 100;
+ final int blocksSize = 1024;
+ final int fileLen = blocks * blocksSize;
+
+
+ // Write a file and restart the cluster
+ long [] capacities = new long[]{ DEFAULT_BLOCK_SIZE * 2 * fileLen,
+ DEFAULT_BLOCK_SIZE * 2 * fileLen };
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDatanodes)
+ .storageCapacities(capacities)
+ .storageTypes(new StorageType[] {StorageType.DISK, StorageType.DISK})
+ .storagesPerDatanode(2)
+ .build();
+ FsVolumeImpl source = null;
+ FsVolumeImpl dest = null;
+ try {
+ cluster.waitActive();
+ Random r = new Random();
+ FileSystem fs = cluster.getFileSystem(0);
+ TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
+ numDatanodes - 1);
+
+ DFSTestUtil.waitReplication(fs, filePath, (short) 1);
+ cluster.restartDataNodes();
+ cluster.waitActive();
+
+ // Get the data node and move all data to one disk.
+ DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1);
+ try (FsDatasetSpi.FsVolumeReferences refs =
+ dnNode.getFSDataset().getFsVolumeReferences()) {
+ source = (FsVolumeImpl) refs.get(0);
+ dest = (FsVolumeImpl) refs.get(1);
+ assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
+ DiskBalancerTestUtil.moveAllDataToDestVolume(
+ dnNode.getFSDataset(), source, dest);
+ assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
+ }
+
+ cluster.restartDataNodes();
+ cluster.waitActive();
+
+ // Start up a disk balancer and read the cluster info.
+ final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1);
+ ClusterConnector nameNodeConnector =
+ ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
+
+ DiskBalancerCluster diskBalancerCluster =
+ new DiskBalancerCluster(nameNodeConnector);
+ diskBalancerCluster.readClusterInfo();
+ List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
+
+ // Rewrite the capacity in the model to show that disks need
+ // re-balancing.
+ setVolumeCapacity(diskBalancerCluster, DEFAULT_BLOCK_SIZE * 2 * fileLen, "DISK");
+ // Pick a node to process.
+ nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode
+ .getDatanodeUuid()));
+ diskBalancerCluster.setNodesToProcess(nodesToProcess);
+
+ // Compute a plan.
+ List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f);
+
+ // Now we must have a plan,since the node is imbalanced and we
+ // asked the disk balancer to create a plan.
+ assertTrue(clusterplan.size() == 1);
+
+ NodePlan plan = clusterplan.get(0);
+ plan.setNodeUUID(dnNode.getDatanodeUuid());
+ plan.setTimeStamp(Time.now());
+ String planJson = plan.toJson();
+ String planID = DigestUtils.sha512Hex(planJson);
+ assertNotNull(plan.getVolumeSetPlans());
+ assertTrue(plan.getVolumeSetPlans().size() > 0);
+ plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
+
+
+ // Submit the plan and wait till the execution is done.
+ newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ return newDN.queryDiskBalancerPlan().getResult() ==
+ DiskBalancerWorkStatus.Result.PLAN_DONE;
+ } catch (IOException ex) {
+ return false;
+ }
+ }
+ }, 1000, 100000);
+
+
+ //verify that it worked.
+ dnNode = cluster.getDataNodes().get(numDatanodes - 1);
+ assertEquals(dnNode.queryDiskBalancerPlan().getResult(),
+ DiskBalancerWorkStatus.Result.PLAN_DONE);
+ try (FsDatasetSpi.FsVolumeReferences refs =
+ dnNode.getFSDataset().getFsVolumeReferences()) {
+ source = (FsVolumeImpl) refs.get(0);
+ assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
+ }
+
+
+
+ // Tolerance
+ long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove()
+ * 10) / 100;
+ assertTrue(
+ (DiskBalancerTestUtil.getBlockCount(source) *
+ DEFAULT_BLOCK_SIZE + delta) >=
+ plan.getVolumeSetPlans().get(0).getBytesToMove());
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Sets alll Disks capacity to size specified.
+ * @param cluster - DiskBalancerCluster
+ * @param size - new size of the disk
+ */
+ private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
+ String diskType) {
+ Preconditions.checkNotNull(cluster);
+ for(DiskBalancerDataNode node : cluster.getNodes()) {
+ for (DiskBalancerVolume vol :
+ node.getVolumeSets().get(diskType).getVolumes()) {
+ vol.setCapacity(size);
+ }
+ node.getVolumeSets().get(diskType).computeVolumeDataDensity();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1594b472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
index ad18075..77c2aa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java
@@ -21,11 +21,9 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
- .DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
- .DiskBalancerVolumeSet;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
@@ -143,9 +141,9 @@ public class TestPlanner {
cluster.readClusterInfo();
Assert.assertEquals(1, cluster.getNodes().size());
- GreedyPlanner planner = new GreedyPlanner(10.0f, node);
- NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
- ());
+ GreedyPlanner planner = new GreedyPlanner(5.0f, node);
+ NodePlan plan = new NodePlan(node.getDataNodeUUID(),
+ node.getDataNodePort());
planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
// We should have only one planned move from
@@ -184,8 +182,8 @@ public class TestPlanner {
cluster.readClusterInfo();
Assert.assertEquals(1, cluster.getNodes().size());
- GreedyPlanner planner = new GreedyPlanner(10.0f, node);
- NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
+ GreedyPlanner planner = new GreedyPlanner(5.0f, node);
+ NodePlan plan = new NodePlan(node.getDataNodeUUID(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
@@ -262,11 +260,10 @@ public class TestPlanner {
assertEquals(2, plan.getVolumeSetPlans().size());
Step step = plan.getVolumeSetPlans().get(0);
assertEquals("volume100", step.getSourceVolume().getPath());
- assertEquals("33.3 G", step.getSizeString(step.getBytesToMove()));
-
+ assertTrue(step.getSizeString(step.getBytesToMove()).matches("33.[2|3|4] G"));
step = plan.getVolumeSetPlans().get(1);
assertEquals("volume100", step.getSourceVolume().getPath());
- assertEquals("33.3 G", step.getSizeString(step.getBytesToMove()));
+ assertTrue(step.getSizeString(step.getBytesToMove()).matches("33.[2|3|4] G"));
}
@Test
@@ -318,11 +315,12 @@ public class TestPlanner {
Step step = newPlan.getVolumeSetPlans().get(0);
assertEquals("volume100", step.getSourceVolume().getPath());
- assertEquals("18.8 G", step.getSizeString(step.getBytesToMove()));
+ assertTrue(step.getSizeString(step.getBytesToMove()).matches("18.[6|7|8] G"));
step = newPlan.getVolumeSetPlans().get(1);
assertEquals("volume100", step.getSourceVolume().getPath());
- assertEquals("18.8 G", step.getSizeString(step.getBytesToMove()));
+ assertTrue(step.getSizeString(step.getBytesToMove()).matches("18.[6|7|8] G"));
+
}
@Test
@@ -364,7 +362,7 @@ public class TestPlanner {
if (step.getDestinationVolume().getPath().equals("volume0-1")) {
assertEquals("volume100", step.getSourceVolume().getPath());
- assertEquals("28.6 G",
+ assertEquals("28.5 G",
step.getSizeString(step.getBytesToMove()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org