You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/03 07:28:34 UTC
svn commit: r1488848 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/balancer/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/test/java/org/apache/ha...
Author: szetszwo
Date: Mon Jun 3 05:28:34 2013
New Revision: 1488848
URL: http://svn.apache.org/r1488848
Log:
Merge r1414874 and r1414878 from trunk for HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
- copied unchanged from r1414874, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
- copied unchanged from r1414874, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1414874
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 3 05:28:34 2013
@@ -43,6 +43,9 @@ Release 2.1.0-beta - UNRELEASED
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
with 4-layer network topology. (Junping Du via szetszwo)
+ HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
+ (Junping Du via szetszwo)
+
IMPROVEMENTS
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1414874
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jun 3 05:28:34 2013
@@ -169,7 +169,7 @@ import org.apache.hadoop.util.ToolRunner
* <ol>
* <li>The cluster is balanced. Exiting
* <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
* <li>Received an IO exception: failure reason. Exiting...
* <li>Another balancer is running. Exiting...
* </ol>
@@ -223,7 +223,7 @@ public class Balancer {
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
- private NetworkTopology cluster = new NetworkTopology();
+ private NetworkTopology cluster;
final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
@@ -250,7 +250,7 @@ public class Balancer {
* Return true if a block and its proxy are chosen; false otherwise
*/
private boolean chooseBlockAndProxy() {
- // iterate all source's blocks until find a good one
+ // iterate all source's blocks until find a good one
for (Iterator<BalancerBlock> blocks=
source.getBlockIterator(); blocks.hasNext();) {
if (markMovedIfGoodBlock(blocks.next())) {
@@ -294,22 +294,35 @@ public class Balancer {
* @return true if a proxy is found; otherwise false
*/
private boolean chooseProxySource() {
- // check if there is replica which is on the same rack with the target
+ final DatanodeInfo targetDN = target.getDatanode();
+ boolean find = false;
for (BalancerDatanode loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
- if (loc.addPendingBlock(this)) {
- proxySource = loc;
+ // check if there is replica which is on the same rack with the target
+ if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+ find = true;
+ // if cluster is not nodegroup aware or the proxy is on the same
+ // nodegroup with target, then we already find the nearest proxy
+ if (!cluster.isNodeGroupAware()
+ || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
return true;
}
}
- }
- // find out a non-busy replica
- for (BalancerDatanode loc : block.getLocations()) {
- if (loc.addPendingBlock(this)) {
- proxySource = loc;
- return true;
+
+ if (!find) {
+ // find out a non-busy replica out of rack of target
+ find = addTo(loc);
}
}
+
+ return find;
+ }
+
+ // add a BalancerDatanode as proxy source for specific block movement
+ private boolean addTo(BalancerDatanode bdn) {
+ if (bdn.addPendingBlock(this)) {
+ proxySource = bdn;
+ return true;
+ }
return false;
}
@@ -687,7 +700,7 @@ public class Balancer {
NodeTask task = tasks.next();
BalancerDatanode target = task.getDatanode();
PendingBlockMove pendingBlock = new PendingBlockMove();
- if ( target.addPendingBlock(pendingBlock) ) {
+ if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
pendingBlock.source = this;
pendingBlock.target = target;
@@ -788,9 +801,10 @@ public class Balancer {
*/
private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException {
- if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
- BlockPlacementPolicyDefault.class) {
- throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+ if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof
+ BlockPlacementPolicyDefault) {
+ throw new UnsupportedActionException(
+ "Balancer without BlockPlacementPolicyDefault");
}
}
@@ -805,6 +819,7 @@ public class Balancer {
this.threshold = p.threshold;
this.policy = p.policy;
this.nnc = theblockpool;
+ cluster = NetworkTopology.getInstance(conf);
}
/* Shuffle datanode array */
@@ -915,9 +930,15 @@ public class Balancer {
* Return total number of bytes to move in this iteration
*/
private long chooseNodes() {
- // Match nodes on the same rack first
+ // First, match nodes on the same node group if cluster has nodegroup
+ // awareness
+ if (cluster.isNodeGroupAware()) {
+ chooseNodesOnSameNodeGroup();
+ }
+
+ // Then, match nodes on the same rack
chooseNodes(true);
- // Then match nodes on different racks
+ // At last, match nodes on different racks
chooseNodes(false);
assert (datanodes.size() >= sources.size()+targets.size())
@@ -932,6 +953,102 @@ public class Balancer {
}
return bytesToMove;
}
+
+ /**
+ * Decide all <source, target> pairs where source and target are
+ * on the same NodeGroup
+ */
+ private void chooseNodesOnSameNodeGroup() {
+
+ /* first step: match each overUtilized datanode (source) to
+ * one or more underUtilized datanodes within same NodeGroup(targets).
+ */
+ chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
+
+ /* match each remaining overutilized datanode (source) to below average
+ * utilized datanodes within the same NodeGroup(targets).
+ * Note only overutilized datanodes that haven't had that max bytes to move
+ * satisfied in step 1 are selected
+ */
+ chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
+
+ /* match each remaining underutilized datanode to above average utilized
+ * datanodes within the same NodeGroup.
+ * Note only underutilized datanodes that have not had that max bytes to
+ * move satisfied in step 1 are selected.
+ */
+ chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
+ }
+
+ /**
+ * Match two sets of nodes within the same NodeGroup, one should be source
+ * nodes (utilization > Avg), and the other should be destination nodes
+ * (utilization < Avg).
+ * @param datanodes
+ * @param candidates
+ */
+ private <D extends BalancerDatanode, C extends BalancerDatanode> void
+ chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) {
+ for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+ final D datanode = i.next();
+ for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
+ if (!datanode.isMoveQuotaFull()) {
+ i.remove();
+ }
+ }
+ }
+
+ /**
+ * Match one datanode with a set of candidates nodes within the same NodeGroup.
+ */
+ private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
+ BalancerDatanode dn, Iterator<T> candidates) {
+ final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
+ if (chosen == null) {
+ return false;
+ }
+ if (dn instanceof Source) {
+ matchSourceWithTargetToMove((Source)dn, chosen);
+ } else {
+ matchSourceWithTargetToMove((Source)chosen, dn);
+ }
+ if (!chosen.isMoveQuotaFull()) {
+ candidates.remove();
+ }
+ return true;
+ }
+
+ private void matchSourceWithTargetToMove(
+ Source source, BalancerDatanode target) {
+ long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ }
+
+ /** choose a datanode from <code>candidates</code> within the same NodeGroup
+ * of <code>dn</code>.
+ */
+ private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
+ BalancerDatanode dn, Iterator<T> candidates) {
+ if (dn.isMoveQuotaFull()) {
+ for(; candidates.hasNext(); ) {
+ final T c = candidates.next();
+ if (!c.isMoveQuotaFull()) {
+ candidates.remove();
+ continue;
+ }
+ if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
+ return c;
+ }
+ }
+ }
+ return null;
+ }
/* if onRack is true, decide all <source, target> pairs
* where source and target are on the same rack; Otherwise
@@ -942,33 +1059,33 @@ public class Balancer {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
- chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+ chooseTargets(underUtilizedDatanodes, onRack);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
- chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+ chooseTargets(belowAvgUtilizedDatanodes, onRack);
- /* match each remaining underutilized datanode to
- * above average utilized datanodes.
+ /* match each remaining underutilized datanode (target) to
+ * above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
- chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+ chooseSources(aboveAvgUtilizedDatanodes, onRack);
}
/* choose targets from the target candidate list for each over utilized
* source datanode. OnRackTarget determines if the chosen target
* should be on the same rack as the source
*/
- private void chooseTargets(
- Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
+ private void chooseTargets(
+ Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
srcIterator.hasNext();) {
Source source = srcIterator.next();
- while (chooseTarget(source, targetCandidates, onRackTarget)) {
+ while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
}
if (!source.isMoveQuotaFull()) {
srcIterator.remove();
@@ -982,11 +1099,11 @@ public class Balancer {
* should be on the same rack as the target
*/
private void chooseSources(
- Iterator<Source> sourceCandidates, boolean onRackSource) {
+ Collection<Source> sourceCandidates, boolean onRackSource) {
for (Iterator<BalancerDatanode> targetIterator =
underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
BalancerDatanode target = targetIterator.next();
- while (chooseSource(target, sourceCandidates, onRackSource)) {
+ while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
}
if (!target.isMoveQuotaFull()) {
targetIterator.remove();
@@ -1026,23 +1143,15 @@ public class Balancer {
}
if (foundTarget) {
assert(target != null):"Choose a null target";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
+ matchSourceWithTargetToMove(source, target);
if (!target.isMoveQuotaFull()) {
targetCandidates.remove();
}
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode + " to " + target.datanode);
return true;
}
return false;
}
-
+
/* For the given target, choose sources from the source candidate list.
* OnRackSource determines if the chosen source
* should be on the same rack as the target
@@ -1074,18 +1183,10 @@ public class Balancer {
}
if (foundSource) {
assert(source != null):"Choose a null source";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
+ matchSourceWithTargetToMove(source, target);
if ( !source.isMoveQuotaFull()) {
- sourceCandidates.remove();
- }
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode + " to " + target.datanode);
+ sourceCandidates.remove();
+ }
return true;
}
return false;
@@ -1227,6 +1328,10 @@ public class Balancer {
if (block.isLocatedOnDatanode(target)) {
return false;
}
+ if (cluster.isNodeGroupAware() &&
+ isOnSameNodeGroupWithReplicas(target, block, source)) {
+ return false;
+ }
boolean goodBlock = false;
if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1258,10 +1363,32 @@ public class Balancer {
}
return goodBlock;
}
-
+
+ /**
+ * Check if there are any replica (other than source) on the same node group
+ * with target. If true, then target is not a good candidate for placing
+ * specific block replica as we don't want 2 replicas under the same nodegroup
+ * after balance.
+ * @param target targetDataNode
+ * @param block dataBlock
+ * @param source sourceDataNode
+ * @return true if there are any replica (other than source) on the same node
+ * group with target
+ */
+ private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+ BalancerBlock block, Source source) {
+ for (BalancerDatanode loc : block.locations) {
+ if (loc != source &&
+ cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/* reset all fields in a balancer preparing for the next iteration */
- private void resetData() {
- this.cluster = new NetworkTopology();
+ private void resetData(Configuration conf) {
+ this.cluster = NetworkTopology.getInstance(conf);
this.overUtilizedDatanodes.clear();
this.aboveAvgUtilizedDatanodes.clear();
this.belowAvgUtilizedDatanodes.clear();
@@ -1333,7 +1460,8 @@ public class Balancer {
}
/** Run an iteration for all datanodes. */
- private ReturnStatus run(int iteration, Formatter formatter) {
+ private ReturnStatus run(int iteration, Formatter formatter,
+ Configuration conf) {
try {
/* get all live datanodes of a cluster and their disk usage
* decide the number of bytes need to be moved
@@ -1387,7 +1515,7 @@ public class Balancer {
}
// clean all lists
- resetData();
+ resetData(conf);
return ReturnStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
@@ -1435,7 +1563,7 @@ public class Balancer {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
- final ReturnStatus r = b.run(iteration, formatter);
+ final ReturnStatus r = b.run(iteration, formatter, conf);
if (r == ReturnStatus.IN_PROGRESS) {
done = false;
} else if (r != ReturnStatus.SUCCESS) {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Jun 3 05:28:34 2013
@@ -39,7 +39,6 @@ import org.apache.hadoop.HadoopIllegalAr
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -163,11 +162,7 @@ public class DatanodeManager {
this.namesystem = namesystem;
this.blockManager = blockManager;
- Class<? extends NetworkTopology> networkTopologyClass =
- conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
- NetworkTopology.class, NetworkTopology.class);
- networktopology = (NetworkTopology) ReflectionUtils.newInstance(
- networkTopologyClass, conf);
+ networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Jun 3 05:28:34 2013
@@ -320,7 +320,7 @@ public class MiniDFSCluster {
/**
* Used by builder to create and return an instance of MiniDFSCluster
*/
- private MiniDFSCluster(Builder builder) throws IOException {
+ protected MiniDFSCluster(Builder builder) throws IOException {
if (builder.nnTopology == null) {
// If no topology is specified, build a single NN.
builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@@ -368,7 +368,7 @@ public class MiniDFSCluster {
private Configuration conf;
private NameNodeInfo[] nameNodes;
- private int numDataNodes;
+ protected int numDataNodes;
protected ArrayList<DataNodeProperties> dataNodes =
new ArrayList<DataNodeProperties>();
private File base_dir;
@@ -2318,7 +2318,7 @@ public class MiniDFSCluster {
return nameNodes[nnIndex].nameNode;
}
- private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
+ protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
if (setupHostsFile) {
String hostsFile = conf.get(DFS_HOSTS, "").trim();