You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/12/16 09:28:21 UTC
svn commit: r1422482 - in /hadoop/common/branches/branch-1: ./
src/core/org/apache/hadoop/net/
src/hdfs/org/apache/hadoop/hdfs/server/balancer/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop...
Author: szetszwo
Date: Sun Dec 16 08:28:17 2012
New Revision: 1422482
URL: http://svn.apache.org/viewvc?rev=1422482&view=rev
Log:
HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new NetworkTopology with NodeGroup and use generic code for choosing datanode in Balancer. Contributed by Junping Du
Added:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sun Dec 16 08:28:17 2012
@@ -51,6 +51,10 @@ Release 1.2.0 - unreleased
HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
(Ahmed Radwan, backported by suresh)
+ HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new
+ NetworkTopology with NodeGroup and use generic code for choosing datanode
+ in Balancer. (Junping Du via szetszwo)
+
IMPROVEMENTS
HDFS-3515. Port HDFS-1457 to branch-1. (eli)
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Sun Dec 16 08:28:17 2012
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
/** The class represents a cluster of computer with a tree hierarchical
* network topology.
@@ -341,6 +343,19 @@ public class NetworkTopology {
public NetworkTopology() {
clusterMap = new InnerNode(InnerNode.ROOT);
}
+
+ /**
+ * Get an instance of NetworkTopology based on the value of the configuration
+ * parameter net.topology.impl.
+ *
+ * @param conf the configuration to be used
+ * @return an instance of NetworkTopology
+ */
+ public static NetworkTopology getInstance(Configuration conf){
+ return (NetworkTopology) ReflectionUtils.newInstance(
+ conf.getClass("net.topology.impl", NetworkTopology.class,
+ NetworkTopology.class), conf);
+ }
/** Add a leaf node
* Update node counter & rack counter if neccessary
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sun Dec 16 08:28:17 2012
@@ -60,7 +60,6 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
@@ -79,6 +78,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
@@ -173,7 +173,7 @@ import org.apache.hadoop.util.ToolRunner
* <ol>
* <li>The cluster is balanced. Exiting
* <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
* <li>Received an IO exception: failure reason. Exiting...
* <li>Another balancer is running. Exiting...
* </ol>
@@ -227,7 +227,7 @@ public class Balancer implements Tool {
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
- private NetworkTopology cluster = new NetworkTopology();
+ private NetworkTopology cluster;
private double avgUtilization = 0.0D;
@@ -542,7 +542,7 @@ public class Balancer implements Tool {
}
/** Decide if still need to move more bytes */
- protected boolean isMoveQuotaFull() {
+ protected boolean hasSpaceForScheduling() {
return scheduledSize<maxSizeToMove;
}
@@ -746,8 +746,8 @@ public class Balancer implements Tool {
long startTime = Util.now();
this.blocksToReceive = 2*scheduledSize;
boolean isTimeUp = false;
- while(!isTimeUp && scheduledSize>0 &&
- (!srcBlockList.isEmpty() || blocksToReceive>0)) {
+ while(!isTimeUp && scheduledSize > 0 &&
+ (!srcBlockList.isEmpty() || blocksToReceive > 0)) {
PendingBlockMove pendingBlock = chooseNextBlockToMove();
if (pendingBlock != null) {
// move the block
@@ -796,7 +796,8 @@ public class Balancer implements Tool {
*/
private void checkReplicationPolicyCompatibility(Configuration conf)
throws UnsupportedActionException {
- if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) {
+ if (!(BlockPlacementPolicy.getInstance(conf, null, null)
+ instanceof BlockPlacementPolicyDefault)) {
throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault");
}
@@ -1065,6 +1066,36 @@ public class Balancer implements Tool {
LOG.info(msg);
}
+ /** A matcher interface for matching nodes. */
+ private interface Matcher {
+ /** Given the cluster topology, does the left node match the right node? */
+ boolean match(NetworkTopology cluster, Node left, Node right);
+ }
+
+ /** Match datanodes in the same node group. */
+ static final Matcher SAME_NODE_GROUP = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return cluster.isOnSameNodeGroup(left, right);
+ }
+ };
+
+ /** Match datanodes in the same rack. */
+ static final Matcher SAME_RACK = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return cluster.isOnSameRack(left, right);
+ }
+ };
+
+ /** Match any datanode with any other datanode. */
+ static final Matcher ANY_OTHER = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return left != right;
+ }
+ };
+
/* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
* Maximum bytes to be moved per node is
@@ -1072,10 +1103,15 @@ public class Balancer implements Tool {
* Return total number of bytes to move in this iteration
*/
private long chooseNodes() {
- // Match nodes on the same rack first
- chooseNodes(true);
- // Then match nodes on different racks
- chooseNodes(false);
+ // First, match nodes on the same node group if cluster is node group aware
+ if (cluster.isNodeGroupAware()) {
+ chooseNodes(SAME_NODE_GROUP);
+ }
+
+ // Then, match nodes on the same rack
+ chooseNodes(SAME_RACK);
+ // At last, match all remaining nodes
+ chooseNodes(ANY_OTHER);
assert (datanodes.size() ==
overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1089,167 +1125,99 @@ public class Balancer implements Tool {
}
return bytesToMove;
}
-
- /* if onRack is true, decide all <source, target> pairs
- * where source and target are on the same rack; Otherwise
- * decide all <source, target> pairs where source and target are
- * on different racks
- */
- private void chooseNodes(boolean onRack) {
+
+ /** Decide all <source, target> pairs according to the matcher. */
+ private void chooseNodes(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
- chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+ chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
- chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+ chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
- /* match each remaining underutilized datanode to
- * above average utilized datanodes.
+ /* match each remaining underutilized datanode (target) to
+ * above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
- chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+ chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
}
-
- /* choose targets from the target candidate list for each over utilized
- * source datanode. OnRackTarget determines if the chosen target
- * should be on the same rack as the source
+
+ /**
+ * For each datanode, choose matching nodes from the candidates. Either the
+ * datanodes or the candidates are source nodes with (utilization > Avg), and
+ * the others are target nodes with (utilization < Avg).
*/
- private void chooseTargets(
- Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
- for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
- srcIterator.hasNext();) {
- Source source = srcIterator.next();
- while (chooseTarget(source, targetCandidates, onRackTarget)) {
- }
- if (!source.isMoveQuotaFull()) {
- srcIterator.remove();
+ private <D extends BalancerDatanode, C extends BalancerDatanode> void
+ chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+ Matcher matcher) {
+ for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+ final D datanode = i.next();
+ for(; chooseForOneDatanode(datanode, candidates, matcher); );
+ if (!datanode.hasSpaceForScheduling()) {
+ i.remove();
}
}
- return;
}
- /* choose sources from the source candidate list for each under utilized
- * target datanode. onRackSource determines if the chosen source
- * should be on the same rack as the target
+ /**
+ * For the given datanode, choose a candidate and then schedule it.
+ * @return true if a candidate is chosen; false if no candidates is chosen.
*/
- private void chooseSources(
- Iterator<Source> sourceCandidates, boolean onRackSource) {
- for (Iterator<BalancerDatanode> targetIterator =
- underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
- BalancerDatanode target = targetIterator.next();
- while (chooseSource(target, sourceCandidates, onRackSource)) {
- }
- if (!target.isMoveQuotaFull()) {
- targetIterator.remove();
- }
- }
- return;
- }
+ private <C extends BalancerDatanode> boolean chooseForOneDatanode(
+ BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+ final Iterator<C> i = candidates.iterator();
+ final C chosen = chooseCandidate(dn, i, matcher);
- /* For the given source, choose targets from the target candidate list.
- * OnRackTarget determines if the chosen target
- * should be on the same rack as the source
- */
- private boolean chooseTarget(Source source,
- Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
- if (!source.isMoveQuotaFull()) {
+ if (chosen == null) {
return false;
}
- boolean foundTarget = false;
- BalancerDatanode target = null;
- while (!foundTarget && targetCandidates.hasNext()) {
- target = targetCandidates.next();
- if (!target.isMoveQuotaFull()) {
- targetCandidates.remove();
- continue;
- }
- if (onRackTarget) {
- // choose from on-rack nodes
- if (cluster.isOnSameRack(source.datanode, target.datanode)) {
- foundTarget = true;
- }
- } else {
- // choose from off-rack nodes
- if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
- foundTarget = true;
- }
- }
+ if (dn instanceof Source) {
+ matchSourceWithTargetToMove((Source)dn, chosen);
+ } else {
+ matchSourceWithTargetToMove((Source)chosen, dn);
}
- if (foundTarget) {
- assert(target != null):"Choose a null target";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
- if (!target.isMoveQuotaFull()) {
- targetCandidates.remove();
- }
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode.getName() + " to " + target.datanode.getName());
- return true;
+ if (!chosen.hasSpaceForScheduling()) {
+ i.remove();
}
- return false;
+ return true;
}
- /* For the given target, choose sources from the source candidate list.
- * OnRackSource determines if the chosen source
- * should be on the same rack as the target
- */
- private boolean chooseSource(BalancerDatanode target,
- Iterator<Source> sourceCandidates, boolean onRackSource) {
- if (!target.isMoveQuotaFull()) {
- return false;
- }
- boolean foundSource = false;
- Source source = null;
- while (!foundSource && sourceCandidates.hasNext()) {
- source = sourceCandidates.next();
- if (!source.isMoveQuotaFull()) {
- sourceCandidates.remove();
- continue;
- }
- if (onRackSource) {
- // choose from on-rack nodes
- if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
- foundSource = true;
- }
- } else {
- // choose from off-rack nodes
- if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
- foundSource = true;
+ private void matchSourceWithTargetToMove(
+ Source source, BalancerDatanode target) {
+ long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ }
+
+ /** Choose a candidate for the given datanode. */
+ private <D extends BalancerDatanode, C extends BalancerDatanode>
+ C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
+ if (dn.hasSpaceForScheduling()) {
+ for(; candidates.hasNext(); ) {
+ final C c = candidates.next();
+ if (!c.hasSpaceForScheduling()) {
+ candidates.remove();
+ } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+ return c;
}
}
}
- if (foundSource) {
- assert(source != null):"Choose a null source";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
- if ( !source.isMoveQuotaFull()) {
- sourceCandidates.remove();
- }
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode.getName() + " to " + target.datanode.getName());
- return true;
- }
- return false;
+ return null;
}
private static class BytesMoved {
- private long bytesMoved = 0L;;
+ private long bytesMoved = 0L;
private synchronized void inc( long bytes ) {
bytesMoved += bytes;
}
@@ -1391,6 +1359,11 @@ public class Balancer implements Tool {
if (block.isLocatedOnDatanode(target)) {
return false;
}
+
+ if (cluster.isNodeGroupAware() &&
+ isOnSameNodeGroupWithReplicas(target, block, source)) {
+ return false;
+ }
boolean goodBlock = false;
if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1423,16 +1396,38 @@ public class Balancer implements Tool {
return goodBlock;
}
+ /**
+ * Check if there are any replica (other than source) on the same node group
+ * with target. If true, then target is not a good candidate for placing
+ * specific block replica as we don't want 2 replicas under the same nodegroup
+ * after balance.
+ * @param target targetDataNode
+ * @param block dataBlock
+ * @param source sourceDataNode
+ * @return true if there are any replica (other than source) on the same node
+ * group with target
+ */
+ private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+ BalancerBlock block, Source source) {
+ for (BalancerDatanode loc : block.locations) {
+ if (loc != source &&
+ cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/* reset all fields in a balancer preparing for the next iteration */
private void resetData() {
- this.cluster = new NetworkTopology();
+ this.cluster = NetworkTopology.getInstance(conf);
this.overUtilizedDatanodes.clear();
this.aboveAvgUtilizedDatanodes.clear();
this.belowAvgUtilizedDatanodes.clear();
this.underUtilizedDatanodes.clear();
this.datanodes.clear();
this.sources.clear();
- this.targets.clear();
+ this.targets.clear();
this.avgUtilization = 0.0D;
cleanGlobalBlockList();
this.movedBlocks.cleanup();
@@ -1505,7 +1500,7 @@ public class Balancer implements Tool {
Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
int iterations = 0;
- while (true ) {
+ while (true) {
/* get all live datanodes of a cluster and their disk usage
* decide the number of bytes need to be moved
*/
@@ -1655,6 +1650,7 @@ public class Balancer implements Tool {
/** set this balancer's configuration */
public void setConf(Configuration conf) {
this.conf = conf;
+ this.cluster = NetworkTopology.getInstance(conf);
movedBlocks.setWinWidth(conf);
}
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Dec 16 08:28:17 2012
@@ -524,9 +524,7 @@ public class FSNamesystem implements FSC
DFSUtil.getInvalidateWorkPctPerIteration(conf);
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
- this.clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
- conf.getClass("net.topology.impl", NetworkTopology.class,
- NetworkTopology.class), conf);
+ this.clusterMap = NetworkTopology.getInstance(conf);
this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_MAX_CORRUPT_FILES_RETURNED_KEY,
@@ -3863,7 +3861,6 @@ public class FSNamesystem implements FSC
// Is the block being reported the last block of an underconstruction file?
boolean blockUnderConstruction = false;
if (fileINode.isUnderConstruction()) {
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
Block last = fileINode.getLastBlock();
if (last == null) {
// This should never happen, but better to handle it properly than to throw
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Sun Dec 16 08:28:17 2012
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -69,13 +70,12 @@ public class MiniDFSCluster {
}
private Configuration conf;
- private NameNode nameNode;
- private int numDataNodes;
- private ArrayList<DataNodeProperties> dataNodes =
+ protected NameNode nameNode;
+ protected int numDataNodes;
+ protected List<DataNodeProperties> dataNodes =
new ArrayList<DataNodeProperties>();
private File base_dir;
- private File data_dir;
-
+ protected File data_dir;
/**
* This null constructor is used only when wishing to start a data node cluster
@@ -439,8 +439,6 @@ public class MiniDFSCluster {
waitActive();
}
-
-
/**
* Modify the config and start up the DataNodes. The info port for
* DataNodes is guaranteed to use a free port.
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java?rev=1422482&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java Sun Dec 16 08:28:17 2012
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+
+public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
+
+ private static String[] NODE_GROUPS = null;
+ private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class);
+
+ public MiniDFSClusterWithNodeGroup(int nameNodePort,
+ Configuration conf,
+ int numDataNodes,
+ boolean format,
+ boolean manageDfsDirs,
+ StartupOption operation,
+ String[] racks,
+ long[] simulatedCapacities) throws IOException {
+ super(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
+ manageDfsDirs, operation, racks, null, simulatedCapacities);
+ }
+
+ public MiniDFSClusterWithNodeGroup(int nameNodePort,
+ Configuration conf,
+ int numDataNodes,
+ boolean format,
+ boolean manageDfsDirs,
+ StartupOption operation,
+ String[] racks,
+ String[] hosts,
+ long[] simulatedCapacities) throws IOException {
+ super(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
+ manageDfsDirs, operation, racks, hosts, simulatedCapacities);
+ }
+
+ // NODE_GROUPS should be set before constructor being executed.
+ public static void setNodeGroups(String[] nodeGroups) {
+ NODE_GROUPS = nodeGroups;
+ }
+
+ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, String[] nodeGroups, String[] hosts,
+ long[] simulatedCapacities) throws IOException {
+ conf.set("slave.host.name", "127.0.0.1");
+
+ int curDatanodesNum = dataNodes.size();
+ // for mincluster's the default initialDelay for BRs is 0
+ if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
+ }
+ // If minicluster's name node is null assume that the conf has been
+ // set with the right address:port of the name node.
+ //
+ if (nameNode != null) { // set conf from the name node
+ InetSocketAddress nnAddr = nameNode.getNameNodeAddress();
+ int nameNodePort = nnAddr.getPort();
+ FileSystem.setDefaultUri(conf,
+ "hdfs://"+ nnAddr.getHostName() +
+ ":" + Integer.toString(nameNodePort));
+ }
+ if (racks != null && numDataNodes > racks.length ) {
+ throw new IllegalArgumentException( "The length of racks [" + racks.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
+ if (nodeGroups != null && numDataNodes > nodeGroups.length ) {
+ throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
+ if (hosts != null && numDataNodes > hosts.length ) {
+ throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+ //Generate some hostnames if required
+ if (racks != null && hosts == null) {
+ hosts = new String[numDataNodes];
+ for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
+ hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
+ }
+ }
+
+ if (simulatedCapacities != null
+ && numDataNodes > simulatedCapacities.length) {
+ throw new IllegalArgumentException( "The length of simulatedCapacities ["
+ + simulatedCapacities.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
+
+ // Set up the right ports for the datanodes
+ conf.set("dfs.datanode.address", "127.0.0.1:0");
+ conf.set("dfs.datanode.http.address", "127.0.0.1:0");
+ conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
+
+ String [] dnArgs = (operation == null ||
+ operation != StartupOption.ROLLBACK) ?
+ null : new String[] {operation.getName()};
+
+ for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
+ Configuration dnConf = new Configuration(conf);
+
+ if (manageDfsDirs) {
+ File dir1 = new File(data_dir, "data"+(2*i+1));
+ File dir2 = new File(data_dir, "data"+(2*i+2));
+ dir1.mkdirs();
+ dir2.mkdirs();
+ if (!dir1.isDirectory() || !dir2.isDirectory()) {
+ throw new IOException("Mkdirs failed to create directory for DataNode "
+ + i + ": " + dir1 + " or " + dir2);
+ }
+ dnConf.set(DataNode.DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath());
+ }
+ if (simulatedCapacities != null) {
+ dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
+ dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
+ simulatedCapacities[i-curDatanodesNum]);
+ }
+ LOG.info("Starting DataNode " + i + " with "
+ + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+ if (hosts != null) {
+ dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
+ LOG.info("Starting DataNode " + i + " with hostname set to: "
+ + dnConf.get("slave.host.name"));
+ }
+ if (racks != null) {
+ String name = hosts[i - curDatanodesNum];
+ if (nodeGroups == null) {
+ LOG.info("Adding node with hostname : " + name + " to rack " +
+ racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]);
+ } else {
+ LOG.info("Adding node with hostname : " + name + " to serverGroup " +
+ nodeGroups[i-curDatanodesNum] + " and rack " +
+ racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] +
+ nodeGroups[i-curDatanodesNum]);
+ }
+ }
+ Configuration newconf = new Configuration(dnConf); // save config
+ if (hosts != null) {
+ NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
+ }
+ DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+ if(dn == null)
+ throw new IOException("Cannot start DataNode in "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+ //since the HDFS does things based on IP:port, we need to add the mapping
+ //for IP:port to rackId
+ String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
+ if (racks != null) {
+ int port = dn.getSelfAddr().getPort();
+ if (nodeGroups == null) {
+ LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
+ " to rack " + racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(ipAddr + ":" + port,
+ racks[i-curDatanodesNum]);
+ } else {
+ LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " +
+ nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] +
+ nodeGroups[i-curDatanodesNum]);
+ }
+ }
+ DataNode.runDatanodeDaemon(dn);
+ dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
+ }
+ curDatanodesNum += numDataNodes;
+ this.numDataNodes += numDataNodes;
+ waitActive();
+ }
+
+ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, String[] nodeGroups, String[] hosts,
+ long[] simulatedCapacities,
+ boolean setupHostsFile) throws IOException {
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
+ hosts, simulatedCapacities);
+ }
+
+ // This is for initialize from parent class.
+ @Override
+ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, String[] hosts,
+ long[] simulatedCapacities) throws IOException {
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, NODE_GROUPS, hosts,
+ simulatedCapacities);
+ }
+
+}
+
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1422482&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Sun Dec 16 08:28:17 2012
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.net.NetworkTopology;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancerWithNodeGroup {
+
+ final private static long CAPACITY = 500L;
+ final private static String RACK0 = "/rack0";
+ final private static String RACK1 = "/rack1";
+ final private static String NODEGROUP0 = "/nodegroup0";
+ final private static String NODEGROUP1 = "/nodegroup1";
+ final private static String NODEGROUP2 = "/nodegroup2";
+ final static private String fileName = "/tmp.txt";
+ final static private Path filePath = new Path(fileName);
+ private static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
+ MiniDFSClusterWithNodeGroup cluster;
+
+ ClientProtocol client;
+ private Balancer balancer;
+
+ static final long TIMEOUT = 20000L; //msec
+ static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
+ static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
+ static final int DEFAULT_BLOCK_SIZE = 5;
+ private static final Random r = new Random();
+
+ static {
+ Balancer.setBlockMoveWaitTime(1000L) ;
+ }
+
+ private void initConf(Configuration conf) {
+ conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+ conf.setLong("dfs.heartbeat.interval", 1L);
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ conf.setLong("dfs.balancer.movedWinWidth", 2000L);
+ conf.set("net.topology.impl", "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+ conf.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
+ }
+
+ /* create a file with a length of <code>fileLen</code> */
+ private void createFile(long fileLen, short replicationFactor)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, fileLen,
+ replicationFactor, r.nextLong());
+ DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+ }
+
+ // create and initiate conf for balancer
+ private Configuration createConf() {
+ Configuration conf = new Configuration();
+ initConf(conf);
+ return conf;
+ }
+
+ /**
+ * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
+ * summed over all nodes. Times out after TIMEOUT msec.
+ * @param expectedUsedSpace
+ * @param expectedTotalSpace
+ * @throws IOException - if getStats() fails
+ * @throws TimeoutException
+ */
+ private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+ throws IOException, TimeoutException {
+ long timeout = TIMEOUT;
+ long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+
+ while (true) {
+ long[] status = client.getStats();
+ double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
+ / expectedTotalSpace;
+ double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
+ / expectedUsedSpace;
+ if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
+ && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+ break; //done
+
+ if (System.currentTimeMillis() > failtime) {
+ throw new TimeoutException("Cluster failed to reached expected values of "
+ + "totalSpace (current: " + status[0]
+ + ", expected: " + expectedTotalSpace
+ + "), or usedSpace (current: " + status[1]
+ + ", expected: " + expectedUsedSpace
+ + "), in more than " + timeout + " msec.");
+ }
+ try {
+ Thread.sleep(100L);
+ } catch(InterruptedException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Wait until balanced: each datanode gives utilization within
+ * BALANCE_ALLOWED_VARIANCE of average
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ private void waitForBalancer(long totalUsedSpace, long totalCapacity)
+ throws IOException, TimeoutException {
+ long timeout = TIMEOUT;
+ long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+ final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+ boolean balanced;
+ do {
+ DatanodeInfo[] datanodeReport =
+ client.getDatanodeReport(DatanodeReportType.ALL);
+ assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+ balanced = true;
+ for (DatanodeInfo datanode : datanodeReport) {
+ double nodeUtilization = ((double)datanode.getDfsUsed())
+ / datanode.getCapacity();
+ if (Math.abs(avgUtilization - nodeUtilization) >
+ BALANCE_ALLOWED_VARIANCE) {
+ balanced = false;
+ if (System.currentTimeMillis() > failtime) {
+ throw new TimeoutException(
+ "Rebalancing expected avg utilization to become "
+ + avgUtilization + ", but on datanode " + datanode
+ + " it remains at " + nodeUtilization
+ + " after more than " + TIMEOUT + " msec.");
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ break;
+ }
+ }
+ } while (!balanced);
+ }
+
+ /** Start balancer and check if the cluster is balanced after the run */
+ private void runBalancer(Configuration conf,
+ long totalUsedSpace, long totalCapacity) throws Exception {
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+ // start rebalancing
+ balancer = new Balancer(conf);
+
+ final int r = balancer.run(new String[0]);
+
+ assertEquals(Balancer.SUCCESS, r);
+
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+ LOG.info("Rebalancing with default factor.");
+ waitForBalancer(totalUsedSpace, totalCapacity);
+ }
+
+ private void runBalancerCanFinish(Configuration conf,
+ long totalUsedSpace, long totalCapacity) throws Exception {
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+ balancer = new Balancer(conf);
+ final int r = balancer.run(new String[0]);
+ Assert.assertTrue(r == Balancer.SUCCESS ||
+ (r == Balancer.NO_MOVE_PROGRESS));
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+ LOG.info("Rebalancing ends successful.");
+ }
+
+ /**
+ * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
+ * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
+ * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
+ * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
+ * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
+ * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
+ * to end in 5 iterations without move block process.
+ */
+ @Test
+ public void testBalancerEndInNoMoveProgress() throws Exception {
+ Configuration conf = createConf();
+ long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
+ String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
+
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ assertEquals(numOfDatanodes, nodeGroups.length);
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+ cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
+ true, true, null, racks, capacities);
+ try {
+ cluster.waitActive();
+ client = DFSClient.createNamenode(conf);
+
+ long totalCapacity = 0L;
+ for(long capacity : capacities) {
+ totalCapacity += capacity;
+ }
+
+ // fill up the cluster to be 60% full
+ long totalUsedSpace = totalCapacity * 6 / 10;
+
+ createFile(totalUsedSpace / 3, (short) 3);
+
+ // run balancer which can finish in 5 iterations with no block movement.
+ runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Create a cluster with even distribution, and a new empty node is added to
+ * the cluster, then test rack locality for balancer policy.
+ */
+ @Test
+ public void testBalancerWithRackLocality() throws Exception {
+ Configuration conf = createConf();
+ long[] capacities = new long[]{CAPACITY, CAPACITY};
+ String[] racks = new String[]{RACK0, RACK1};
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
+
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+ cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
+ true, true, null, racks, capacities);
+ try {
+ cluster.waitActive();
+
+ client = DFSClient.createNamenode(conf);
+
+ long totalCapacity = 0L;
+ for(long capacity : capacities) {
+ totalCapacity += capacity;
+ }
+
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity * 3 / 10;
+ createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+
+ long newCapacity = CAPACITY;
+ String newRack = RACK1;
+ String newNodeGroup = NODEGROUP2;
+
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+ new String[]{newNodeGroup}, new long[] {newCapacity});
+
+ totalCapacity += newCapacity;
+
+ // run balancer and validate results
+ runBalancer(conf, totalUsedSpace, totalCapacity);
+
+ DatanodeInfo[] datanodeReport =
+ client.getDatanodeReport(DatanodeReportType.ALL);
+
+ Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
+ for (DatanodeInfo datanode: datanodeReport) {
+ String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
+ int usedCapacity = (int) datanode.getDfsUsed();
+
+ if (rackToUsedCapacity.get(rack) != null) {
+ rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
+ } else {
+ rackToUsedCapacity.put(rack, usedCapacity);
+ }
+ }
+ assertEquals(rackToUsedCapacity.size(), 2);
+ assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /** Create a cluster with even distribution, and a new empty node is added to
+ * the cluster, then test rack locality for balancer policy.
+ **/
+ @Test
+ public void testBalancerWithNodeGroup() throws Exception {
+ Configuration conf = createConf();
+ long[] capacities = new long[]{CAPACITY, CAPACITY};
+ String[] racks = new String[]{RACK0, RACK1};
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
+
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+ cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
+ true, true, null, racks, capacities);
+ try {
+ cluster.waitActive();
+ client = DFSClient.createNamenode(conf);
+
+ long totalCapacity = 0L;
+ for(long capacity : capacities) {
+ totalCapacity += capacity;
+ }
+
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity*3/10;
+
+ createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+
+ long newCapacity = CAPACITY;
+ String newRack = RACK1;
+ String newNodeGroup = NODEGROUP2;
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+ new String[]{newNodeGroup}, new long[] {newCapacity});
+
+ totalCapacity += newCapacity;
+
+ // run balancer and validate results
+ runBalancer(conf, totalUsedSpace, totalCapacity);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+}
\ No newline at end of file