You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/03 08:11:22 UTC

svn commit: r1488855 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/test/java/org/apache/hadoop/hdfs/server/balancer/

Author: szetszwo
Date: Mon Jun  3 06:11:22 2013
New Revision: 1488855

URL: http://svn.apache.org/r1488855
Log:
svn merge -c 1430917 from trunk for HDFS-4261. Fix bugs in Balaner causing infinite loop and TestBalancerWithNodeGroup timeing out.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1430917

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun  3 06:11:22 2013
@@ -311,6 +311,9 @@ Release 2.1.0-beta - UNRELEASED
     the nodes in the same nodegroup should also be excluded.  (Junping Du
     via szetszwo)
 
+    HDFS-4261. Fix bugs in Balaner causing infinite loop and
+    TestBalancerWithNodeGroup timeing out.  (Junping Du via szetszwo)
+
   BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
 
     HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1430917

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jun  3 06:11:22 2013
@@ -190,6 +190,7 @@ public class Balancer {
    * balancing purpose at a datanode
    */
   public static final int MAX_NUM_CONCURRENT_MOVES = 5;
+  public static final int MAX_NO_PENDING_BLOCK_INTERATIONS = 5;
   
   private static final String USAGE = "Usage: java "
       + Balancer.class.getSimpleName()
@@ -225,7 +226,6 @@ public class Balancer {
                  = new HashMap<String, BalancerDatanode>();
   
   private NetworkTopology cluster;
-  
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -753,6 +753,7 @@ public class Balancer {
       long startTime = Time.now();
       this.blocksToReceive = 2*scheduledSize;
       boolean isTimeUp = false;
+      int noPendingBlockIteration = 0;
       while(!isTimeUp && scheduledSize>0 &&
           (!srcBlockList.isEmpty() || blocksToReceive>0)) {
         PendingBlockMove pendingBlock = chooseNextBlockToMove();
@@ -776,7 +777,15 @@ public class Balancer {
             LOG.warn("Exception while getting block list", e);
             return;
           }
-        } 
+        } else {
+          // source node cannot find a pendingBlockToMove, iteration +1
+          noPendingBlockIteration++;
+          // in case no blocks can be moved for source node's task,
+          // jump out of while-loop after 5 iterations.
+          if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_INTERATIONS) {
+            scheduledSize = 0;
+          }
+        }
         
         // check if time is up or not
         if (Time.now()-startTime > MAX_ITERATION_TIME) {
@@ -802,8 +811,8 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
-        BlockPlacementPolicyDefault) {
+    if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+        BlockPlacementPolicyDefault)) {
       throw new UnsupportedActionException(
           "Balancer without BlockPlacementPolicyDefault");
     }
@@ -1086,7 +1095,6 @@ public class Balancer {
     }
   };
   private BytesMoved bytesMoved = new BytesMoved();
-  private int notChangedIterations = 0;
   
   /* Start a thread to dispatch block moves for each source. 
    * The thread selects blocks to move & sends request to proxy source to
@@ -1386,19 +1394,10 @@ public class Balancer {
        * available to move.
        * Exit no byte has been moved for 5 consecutive iterations.
        */
-      if (dispatchBlockMoves() > 0) {
-        notChangedIterations = 0;
-      } else {
-        notChangedIterations++;
-        if (notChangedIterations >= 5) {
-          System.out.println(
-              "No block has been moved for 5 iterations. Exiting...");
-          return ReturnStatus.NO_MOVE_PROGRESS;
-        }
+      if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
+        return ReturnStatus.NO_MOVE_PROGRESS;
       }
 
-      // clean all lists
-      resetData(conf);
       return ReturnStatus.IN_PROGRESS;
     } catch (IllegalArgumentException e) {
       System.out.println(e + ".  Exiting ...");
@@ -1447,6 +1446,8 @@ public class Balancer {
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
           final ReturnStatus r = b.run(iteration, formatter, conf);
+          // clean all lists
+          b.resetData(conf);
           if (r == ReturnStatus.IN_PROGRESS) {
             done = false;
           } else if (r != ReturnStatus.SUCCESS) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Jun  3 06:11:22 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.util.Daemon;
 class NameNodeConnector {
   private static final Log LOG = Balancer.LOG;
   private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+  private static final int MAX_NOT_CHANGED_INTERATIONS = 5;
 
   final URI nameNodeUri;
   final String blockpoolID;
@@ -65,6 +66,8 @@ class NameNodeConnector {
   private final boolean encryptDataTransfer;
   private boolean shouldRun;
   private long keyUpdaterInterval;
+  // used for balancer
+  private int notChangedIterations = 0;
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
   private DataEncryptionKey encryptionKey;
@@ -119,6 +122,20 @@ class NameNodeConnector {
     }
   }
 
+  boolean shouldContinue(long dispatchBlockMoveBytes) {
+    if (dispatchBlockMoveBytes > 0) {
+      notChangedIterations = 0;
+    } else {
+      notChangedIterations++;
+      if (notChangedIterations >= MAX_NOT_CHANGED_INTERATIONS) {
+        System.out.println("No block has been moved for "
+            + notChangedIterations + " iterations. Exiting...");
+        return false;
+      }
+    }
+    return true;
+  }
+  
   /** Get an access token for a block. */
   Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
       ) throws IOException {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1488855&r1=1488854&r2=1488855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Mon Jun  3 06:11:22 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.net.NetworkTopology;
 import org.junit.Test;
+import junit.framework.Assert;
 
 /**
  * This class tests if a balancer schedules tasks correctly.
@@ -174,12 +175,25 @@ public class TestBalancerWithNodeGroup {
     LOG.info("Rebalancing with default factor.");
     waitForBalancer(totalUsedSpace, totalCapacity);
   }
+  
+  private void runBalancerCanFinish(Configuration conf,
+      long totalUsedSpace, long totalCapacity) throws Exception {
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+    // start rebalancing
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+    Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
+        (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+    LOG.info("Rebalancing with default factor.");
+  }
 
   /**
    * Create a cluster with even distribution, and a new empty node is added to
    * the cluster, then test rack locality for balancer policy. 
    */
-  @Test
+  @Test(timeout=60000)
   public void testBalancerWithRackLocality() throws Exception {
     Configuration conf = createConf();
     long[] capacities = new long[]{CAPACITY, CAPACITY};
@@ -217,7 +231,7 @@ public class TestBalancerWithNodeGroup {
       totalCapacity += newCapacity;
 
       // run balancer and validate results
-      runBalancer(conf, totalUsedSpace, totalCapacity);
+      runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
       
       DatanodeInfo[] datanodeReport = 
               client.getDatanodeReport(DatanodeReportType.ALL);
@@ -245,7 +259,7 @@ public class TestBalancerWithNodeGroup {
    * Create a cluster with even distribution, and a new empty node is added to
    * the cluster, then test node-group locality for balancer policy.
    */
-  @Test
+  @Test(timeout=60000)
   public void testBalancerWithNodeGroup() throws Exception {
     Configuration conf = createConf();
     long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
@@ -289,4 +303,49 @@ public class TestBalancerWithNodeGroup {
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
+   * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster 
+   * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
+   * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
+   * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
+   * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
+   * to end in 5 iterations without move block process.
+   */
+  @Test(timeout=60000)
+  public void testBalancerEndInNoMoveProgress() throws Exception {
+    Configuration conf = createConf();
+    long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
+    String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
+    String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
+    
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    assertEquals(numOfDatanodes, nodeGroups.length);
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+                                .numDataNodes(capacities.length)
+                                .racks(racks)
+                                .simulatedCapacities(capacities);
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    cluster = new MiniDFSClusterWithNodeGroup(builder);
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, 
+          cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+
+      long totalCapacity = TestBalancer.sum(capacities);
+      // fill up the cluster to be 60% full
+      long totalUsedSpace = totalCapacity * 6 / 10;
+      TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3, 
+          (short) (3), 0);
+
+      // run balancer which can finish in 5 iterations with no block movement.
+      runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }