You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/03 08:11:22 UTC
svn commit: r1488855 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/balancer/
src/test/java/org/apache/hadoop/hdfs/server/balancer/
Author: szetszwo
Date: Mon Jun 3 06:11:22 2013
New Revision: 1488855
URL: http://svn.apache.org/r1488855
Log:
svn merge -c 1430917 from trunk for HDFS-4261. Fix bugs in Balaner causing infinite loop and TestBalancerWithNodeGroup timeing out.
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1430917
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 3 06:11:22 2013
@@ -311,6 +311,9 @@ Release 2.1.0-beta - UNRELEASED
the nodes in the same nodegroup should also be excluded. (Junping Du
via szetszwo)
+ HDFS-4261. Fix bugs in Balaner causing infinite loop and
+ TestBalancerWithNodeGroup timeing out. (Junping Du via szetszwo)
+
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1430917
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jun 3 06:11:22 2013
@@ -190,6 +190,7 @@ public class Balancer {
* balancing purpose at a datanode
*/
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
+ public static final int MAX_NO_PENDING_BLOCK_INTERATIONS = 5;
private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName()
@@ -225,7 +226,6 @@ public class Balancer {
= new HashMap<String, BalancerDatanode>();
private NetworkTopology cluster;
-
final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -753,6 +753,7 @@ public class Balancer {
long startTime = Time.now();
this.blocksToReceive = 2*scheduledSize;
boolean isTimeUp = false;
+ int noPendingBlockIteration = 0;
while(!isTimeUp && scheduledSize>0 &&
(!srcBlockList.isEmpty() || blocksToReceive>0)) {
PendingBlockMove pendingBlock = chooseNextBlockToMove();
@@ -776,7 +777,15 @@ public class Balancer {
LOG.warn("Exception while getting block list", e);
return;
}
- }
+ } else {
+ // source node cannot find a pendingBlockToMove, iteration +1
+ noPendingBlockIteration++;
+ // in case no blocks can be moved for source node's task,
+ // jump out of while-loop after 5 iterations.
+ if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_INTERATIONS) {
+ scheduledSize = 0;
+ }
+ }
// check if time is up or not
if (Time.now()-startTime > MAX_ITERATION_TIME) {
@@ -802,8 +811,8 @@ public class Balancer {
*/
private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException {
- if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof
- BlockPlacementPolicyDefault) {
+ if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof
+ BlockPlacementPolicyDefault)) {
throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault");
}
@@ -1086,7 +1095,6 @@ public class Balancer {
}
};
private BytesMoved bytesMoved = new BytesMoved();
- private int notChangedIterations = 0;
/* Start a thread to dispatch block moves for each source.
* The thread selects blocks to move & sends request to proxy source to
@@ -1386,19 +1394,10 @@ public class Balancer {
* available to move.
* Exit no byte has been moved for 5 consecutive iterations.
*/
- if (dispatchBlockMoves() > 0) {
- notChangedIterations = 0;
- } else {
- notChangedIterations++;
- if (notChangedIterations >= 5) {
- System.out.println(
- "No block has been moved for 5 iterations. Exiting...");
- return ReturnStatus.NO_MOVE_PROGRESS;
- }
+ if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
+ return ReturnStatus.NO_MOVE_PROGRESS;
}
- // clean all lists
- resetData(conf);
return ReturnStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
@@ -1447,6 +1446,8 @@ public class Balancer {
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
final ReturnStatus r = b.run(iteration, formatter, conf);
+ // clean all lists
+ b.resetData(conf);
if (r == ReturnStatus.IN_PROGRESS) {
done = false;
} else if (r != ReturnStatus.SUCCESS) {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Jun 3 06:11:22 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.util.Daemon;
class NameNodeConnector {
private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+ private static final int MAX_NOT_CHANGED_INTERATIONS = 5;
final URI nameNodeUri;
final String blockpoolID;
@@ -65,6 +66,8 @@ class NameNodeConnector {
private final boolean encryptDataTransfer;
private boolean shouldRun;
private long keyUpdaterInterval;
+ // used for balancer
+ private int notChangedIterations = 0;
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey;
@@ -119,6 +122,20 @@ class NameNodeConnector {
}
}
+ boolean shouldContinue(long dispatchBlockMoveBytes) {
+ if (dispatchBlockMoveBytes > 0) {
+ notChangedIterations = 0;
+ } else {
+ notChangedIterations++;
+ if (notChangedIterations >= MAX_NOT_CHANGED_INTERATIONS) {
+ System.out.println("No block has been moved for "
+ + notChangedIterations + " iterations. Exiting...");
+ return false;
+ }
+ }
+ return true;
+ }
+
/** Get an access token for a block. */
Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
) throws IOException {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Mon Jun 3 06:11:22 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.net.NetworkTopology;
import org.junit.Test;
+import junit.framework.Assert;
/**
* This class tests if a balancer schedules tasks correctly.
@@ -174,12 +175,25 @@ public class TestBalancerWithNodeGroup {
LOG.info("Rebalancing with default factor.");
waitForBalancer(totalUsedSpace, totalCapacity);
}
+
+ private void runBalancerCanFinish(Configuration conf,
+ long totalUsedSpace, long totalCapacity) throws Exception {
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+ // start rebalancing
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
+ (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+ LOG.info("Rebalancing with default factor.");
+ }
/**
* Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test rack locality for balancer policy.
*/
- @Test
+ @Test(timeout=60000)
public void testBalancerWithRackLocality() throws Exception {
Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY};
@@ -217,7 +231,7 @@ public class TestBalancerWithNodeGroup {
totalCapacity += newCapacity;
// run balancer and validate results
- runBalancer(conf, totalUsedSpace, totalCapacity);
+ runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
DatanodeInfo[] datanodeReport =
client.getDatanodeReport(DatanodeReportType.ALL);
@@ -245,7 +259,7 @@ public class TestBalancerWithNodeGroup {
* Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test node-group locality for balancer policy.
*/
- @Test
+ @Test(timeout=60000)
public void testBalancerWithNodeGroup() throws Exception {
Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
@@ -289,4 +303,49 @@ public class TestBalancerWithNodeGroup {
cluster.shutdown();
}
}
+
+ /**
+ * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
+ * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
+ * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
+ * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
+ * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
+ * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
+ * to end in 5 iterations without move block process.
+ */
+ @Test(timeout=60000)
+ public void testBalancerEndInNoMoveProgress() throws Exception {
+ Configuration conf = createConf();
+ long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
+ String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
+
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ assertEquals(numOfDatanodes, nodeGroups.length);
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(capacities.length)
+ .racks(racks)
+ .simulatedCapacities(capacities);
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+ cluster = new MiniDFSClusterWithNodeGroup(builder);
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+
+ long totalCapacity = TestBalancer.sum(capacities);
+ // fill up the cluster to be 60% full
+ long totalUsedSpace = totalCapacity * 6 / 10;
+ TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3,
+ (short) (3), 0);
+
+ // run balancer which can finish in 5 iterations with no block movement.
+ runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
}