You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/12/16 09:39:21 UTC
svn commit: r1422484 - in /hadoop/common/branches/branch-1-win: ./
src/core/org/apache/hadoop/net/
src/hdfs/org/apache/hadoop/hdfs/server/balancer/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/ src/test/org/apache/ha...
Author: szetszwo
Date: Sun Dec 16 08:39:19 2012
New Revision: 1422484
URL: http://svn.apache.org/viewvc?rev=1422484&view=rev
Log:
svn merge -c 1422482 from branch-1 for HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new NetworkTopology with NodeGroup and use generic code for choosing datanode in Balancer.
Added:
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
- copied unchanged from r1422482, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
- copied unchanged from r1422482, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
hadoop/common/branches/branch-1-win/ (props changed)
hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
hadoop/common/branches/branch-1-win/CHANGES.txt (props changed)
hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
Propchange: hadoop/common/branches/branch-1-win/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-1:r1422482
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1422484&r1=1422483&r2=1422484&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Sun Dec 16 08:39:19 2012
@@ -295,3 +295,7 @@ Branch-hadoop-1-win (branched from branc
HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
(Ahmed Radwan, backported by suresh)
+
+ HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new
+ NetworkTopology with NodeGroup and use generic code for choosing datanode
+ in Balancer. (Junping Du via szetszwo)
Propchange: hadoop/common/branches/branch-1-win/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1422482
Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1422484&r1=1422483&r2=1422484&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java Sun Dec 16 08:39:19 2012
@@ -28,6 +28,8 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
/** The class represents a cluster of computer with a tree hierarchical
* network topology.
@@ -334,6 +336,19 @@ public class NetworkTopology {
public NetworkTopology() {
clusterMap = new InnerNode(InnerNode.ROOT);
}
+
+ /**
+ * Get an instance of NetworkTopology based on the value of the configuration
+ * parameter net.topology.impl.
+ *
+ * @param conf the configuration to be used
+ * @return an instance of NetworkTopology
+ */
+ public static NetworkTopology getInstance(Configuration conf){
+ return (NetworkTopology) ReflectionUtils.newInstance(
+ conf.getClass("net.topology.impl", NetworkTopology.class,
+ NetworkTopology.class), conf);
+ }
/** Add a leaf node
* Update node counter & rack counter if neccessary
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1422484&r1=1422483&r2=1422484&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sun Dec 16 08:39:19 2012
@@ -60,7 +60,6 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
@@ -79,6 +78,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
@@ -173,7 +173,7 @@ import org.apache.hadoop.util.ToolRunner
* <ol>
* <li>The cluster is balanced. Exiting
* <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
* <li>Received an IO exception: failure reason. Exiting...
* <li>Another balancer is running. Exiting...
* </ol>
@@ -232,7 +232,7 @@ public class Balancer implements Tool {
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
- private NetworkTopology cluster = new NetworkTopology();
+ private NetworkTopology cluster;
private BlockPlacementPolicy policy;
private double avgUtilization = 0.0D;
@@ -549,7 +549,7 @@ public class Balancer implements Tool {
}
/** Decide if still need to move more bytes */
- protected boolean isMoveQuotaFull() {
+ protected boolean hasSpaceForScheduling() {
return scheduledSize<maxSizeToMove;
}
@@ -753,8 +753,8 @@ public class Balancer implements Tool {
long startTime = Util.now();
this.blocksToReceive = 2*scheduledSize;
boolean isTimeUp = false;
- while(!isTimeUp && scheduledSize>0 &&
- (!srcBlockList.isEmpty() || blocksToReceive>0)) {
+ while(!isTimeUp && scheduledSize > 0 &&
+ (!srcBlockList.isEmpty() || blocksToReceive > 0)) {
PendingBlockMove pendingBlock = chooseNextBlockToMove();
if (pendingBlock != null) {
// move the block
@@ -1069,6 +1069,36 @@ public class Balancer implements Tool {
LOG.info(msg);
}
+ /** A matcher interface for matching nodes. */
+ private interface Matcher {
+ /** Given the cluster topology, does the left node match the right node? */
+ boolean match(NetworkTopology cluster, Node left, Node right);
+ }
+
+ /** Match datanodes in the same node group. */
+ static final Matcher SAME_NODE_GROUP = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return cluster.isOnSameNodeGroup(left, right);
+ }
+ };
+
+ /** Match datanodes in the same rack. */
+ static final Matcher SAME_RACK = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return cluster.isOnSameRack(left, right);
+ }
+ };
+
+ /** Match any datanode with any other datanode. */
+ static final Matcher ANY_OTHER = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return left != right;
+ }
+ };
+
/* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
* Maximum bytes to be moved per node is
@@ -1076,10 +1106,15 @@ public class Balancer implements Tool {
* Return total number of bytes to move in this iteration
*/
private long chooseNodes() {
- // Match nodes on the same rack first
- chooseNodes(true);
- // Then match nodes on different racks
- chooseNodes(false);
+ // First, match nodes on the same node group if cluster is node group aware
+ if (cluster.isNodeGroupAware()) {
+ chooseNodes(SAME_NODE_GROUP);
+ }
+
+ // Then, match nodes on the same rack
+ chooseNodes(SAME_RACK);
+ // At last, match all remaining nodes
+ chooseNodes(ANY_OTHER);
assert (datanodes.size() ==
overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1093,167 +1128,99 @@ public class Balancer implements Tool {
}
return bytesToMove;
}
-
- /* if onRack is true, decide all <source, target> pairs
- * where source and target are on the same rack; Otherwise
- * decide all <source, target> pairs where source and target are
- * on different racks
- */
- private void chooseNodes(boolean onRack) {
+
+ /** Decide all <source, target> pairs according to the matcher. */
+ private void chooseNodes(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
- chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+ chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
- chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+ chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
- /* match each remaining underutilized datanode to
- * above average utilized datanodes.
+ /* match each remaining underutilized datanode (target) to
+ * above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
- chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+ chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
}
-
- /* choose targets from the target candidate list for each over utilized
- * source datanode. OnRackTarget determines if the chosen target
- * should be on the same rack as the source
+
+ /**
+ * For each datanode, choose matching nodes from the candidates. Either the
+ * datanodes or the candidates are source nodes with (utilization > Avg), and
+ * the others are target nodes with (utilization < Avg).
*/
- private void chooseTargets(
- Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
- for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
- srcIterator.hasNext();) {
- Source source = srcIterator.next();
- while (chooseTarget(source, targetCandidates, onRackTarget)) {
- }
- if (!source.isMoveQuotaFull()) {
- srcIterator.remove();
+ private <D extends BalancerDatanode, C extends BalancerDatanode> void
+ chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+ Matcher matcher) {
+ for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+ final D datanode = i.next();
+ for(; chooseForOneDatanode(datanode, candidates, matcher); );
+ if (!datanode.hasSpaceForScheduling()) {
+ i.remove();
}
}
- return;
}
- /* choose sources from the source candidate list for each under utilized
- * target datanode. onRackSource determines if the chosen source
- * should be on the same rack as the target
+ /**
+ * For the given datanode, choose a candidate and then schedule it.
+ * @return true if a candidate is chosen; false if no candidates is chosen.
*/
- private void chooseSources(
- Iterator<Source> sourceCandidates, boolean onRackSource) {
- for (Iterator<BalancerDatanode> targetIterator =
- underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
- BalancerDatanode target = targetIterator.next();
- while (chooseSource(target, sourceCandidates, onRackSource)) {
- }
- if (!target.isMoveQuotaFull()) {
- targetIterator.remove();
- }
- }
- return;
- }
+ private <C extends BalancerDatanode> boolean chooseForOneDatanode(
+ BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+ final Iterator<C> i = candidates.iterator();
+ final C chosen = chooseCandidate(dn, i, matcher);
- /* For the given source, choose targets from the target candidate list.
- * OnRackTarget determines if the chosen target
- * should be on the same rack as the source
- */
- private boolean chooseTarget(Source source,
- Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
- if (!source.isMoveQuotaFull()) {
+ if (chosen == null) {
return false;
}
- boolean foundTarget = false;
- BalancerDatanode target = null;
- while (!foundTarget && targetCandidates.hasNext()) {
- target = targetCandidates.next();
- if (!target.isMoveQuotaFull()) {
- targetCandidates.remove();
- continue;
- }
- if (onRackTarget) {
- // choose from on-rack nodes
- if (cluster.isOnSameRack(source.datanode, target.datanode)) {
- foundTarget = true;
- }
- } else {
- // choose from off-rack nodes
- if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
- foundTarget = true;
- }
- }
+ if (dn instanceof Source) {
+ matchSourceWithTargetToMove((Source)dn, chosen);
+ } else {
+ matchSourceWithTargetToMove((Source)chosen, dn);
}
- if (foundTarget) {
- assert(target != null):"Choose a null target";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
- if (!target.isMoveQuotaFull()) {
- targetCandidates.remove();
- }
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode.getName() + " to " + target.datanode.getName());
- return true;
+ if (!chosen.hasSpaceForScheduling()) {
+ i.remove();
}
- return false;
+ return true;
}
- /* For the given target, choose sources from the source candidate list.
- * OnRackSource determines if the chosen source
- * should be on the same rack as the target
- */
- private boolean chooseSource(BalancerDatanode target,
- Iterator<Source> sourceCandidates, boolean onRackSource) {
- if (!target.isMoveQuotaFull()) {
- return false;
- }
- boolean foundSource = false;
- Source source = null;
- while (!foundSource && sourceCandidates.hasNext()) {
- source = sourceCandidates.next();
- if (!source.isMoveQuotaFull()) {
- sourceCandidates.remove();
- continue;
- }
- if (onRackSource) {
- // choose from on-rack nodes
- if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
- foundSource = true;
- }
- } else {
- // choose from off-rack nodes
- if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
- foundSource = true;
+ private void matchSourceWithTargetToMove(
+ Source source, BalancerDatanode target) {
+ long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ }
+
+ /** Choose a candidate for the given datanode. */
+ private <D extends BalancerDatanode, C extends BalancerDatanode>
+ C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
+ if (dn.hasSpaceForScheduling()) {
+ for(; candidates.hasNext(); ) {
+ final C c = candidates.next();
+ if (!c.hasSpaceForScheduling()) {
+ candidates.remove();
+ } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+ return c;
}
}
}
- if (foundSource) {
- assert(source != null):"Choose a null source";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
- if ( !source.isMoveQuotaFull()) {
- sourceCandidates.remove();
- }
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode.getName() + " to " + target.datanode.getName());
- return true;
- }
- return false;
+ return null;
}
private static class BytesMoved {
- private long bytesMoved = 0L;;
+ private long bytesMoved = 0L;
private synchronized void inc( long bytes ) {
bytesMoved += bytes;
}
@@ -1395,6 +1362,11 @@ public class Balancer implements Tool {
if (block.isLocatedOnDatanode(target)) {
return false;
}
+
+ if (cluster.isNodeGroupAware() &&
+ isOnSameNodeGroupWithReplicas(target, block, source)) {
+ return false;
+ }
boolean goodBlock = false;
synchronized (block) {
@@ -1409,16 +1381,38 @@ public class Balancer implements Tool {
return goodBlock;
}
+ /**
+ * Check if there are any replica (other than source) on the same node group
+ * with target. If true, then target is not a good candidate for placing
+ * specific block replica as we don't want 2 replicas under the same nodegroup
+ * after balance.
+ * @param target targetDataNode
+ * @param block dataBlock
+ * @param source sourceDataNode
+ * @return true if there are any replica (other than source) on the same node
+ * group with target
+ */
+ private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+ BalancerBlock block, Source source) {
+ for (BalancerDatanode loc : block.locations) {
+ if (loc != source &&
+ cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/* reset all fields in a balancer preparing for the next iteration */
private void resetData() {
- this.cluster = new NetworkTopology();
+ this.cluster = NetworkTopology.getInstance(conf);
this.overUtilizedDatanodes.clear();
this.aboveAvgUtilizedDatanodes.clear();
this.belowAvgUtilizedDatanodes.clear();
this.underUtilizedDatanodes.clear();
this.datanodes.clear();
this.sources.clear();
- this.targets.clear();
+ this.targets.clear();
this.avgUtilization = 0.0D;
cleanGlobalBlockList();
this.movedBlocks.cleanup();
@@ -1491,7 +1485,7 @@ public class Balancer implements Tool {
Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
int iterations = 0;
- while (true ) {
+ while (true) {
/* get all live datanodes of a cluster and their disk usage
* decide the number of bytes need to be moved
*/
@@ -1641,6 +1635,7 @@ public class Balancer implements Tool {
/** set this balancer's configuration */
public void setConf(Configuration conf) {
this.conf = conf;
+ this.cluster = NetworkTopology.getInstance(conf);
movedBlocks.setWinWidth(conf);
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1422484&r1=1422483&r2=1422484&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Dec 16 08:39:19 2012
@@ -484,9 +484,7 @@ public class FSNamesystem implements FSC
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
- this.clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
- conf.getClass("net.topology.impl", NetworkTopology.class,
- NetworkTopology.class), conf);
+ this.clusterMap = NetworkTopology.getInstance(conf);
this.replicator = BlockPlacementPolicy.getInstance(conf, this, clusterMap);
@@ -3598,7 +3596,6 @@ public class FSNamesystem implements FSC
// Is the block being reported the last block of an underconstruction file?
boolean blockUnderConstruction = false;
if (fileINode.isUnderConstruction()) {
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
Block last = fileINode.getLastBlock();
if (last == null) {
// This should never happen, but better to handle it properly than to throw
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1422484&r1=1422483&r2=1422484&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Sun Dec 16 08:39:19 2012
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -70,13 +71,12 @@ public class MiniDFSCluster {
}
private Configuration conf;
- private NameNode nameNode;
- private int numDataNodes;
- private ArrayList<DataNodeProperties> dataNodes =
+ protected NameNode nameNode;
+ protected int numDataNodes;
+ protected List<DataNodeProperties> dataNodes =
new ArrayList<DataNodeProperties>();
private File base_dir;
- private File data_dir;
-
+ protected File data_dir;
/**
* This null constructor is used only when wishing to start a data node cluster
@@ -446,8 +446,6 @@ public class MiniDFSCluster {
waitActive();
}
-
-
/**
* Modify the config and start up the DataNodes. The info port for
* DataNodes is guaranteed to use a free port.