You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/06/23 20:07:46 UTC

svn commit: r670696 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/hdfs/org/apache/hadoop/dfs/Balancer.java

Author: hairong
Date: Mon Jun 23 11:07:45 2008
New Revision: 670696

URL: http://svn.apache.org/viewvc?rev=670696&view=rev
Log:
Merge -r 670694:670695 to move the change to main by HADOOP-3487 into release 0.18.0.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=670696&r1=670695&r2=670696&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Mon Jun 23 11:07:45 2008
@@ -682,6 +682,9 @@
     HADOOP-3522. Improve documentation on reduce pointing out that
     input keys and values will be reused. (omalley)
 
+    HADOOP-3487. Balancer uses thread pools for managing its threads;
+    therefore provides better resource management. (hairong)
+
   BUG FIXES
 
     HADOOP-1979. Speed up fsck by adding a buffered stream. (Lohit

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java?rev=670696&r1=670695&r2=670696&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java Mon Jun 23 11:07:45 2008
@@ -42,6 +42,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -204,6 +208,13 @@
   
   private double avgUtilization = 0.0D;
   
+  final private int MOVER_THREAD_POOL_SIZE = 1000;
+  final private ExecutorService moverExecutor = 
+    Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
+  final private int DISPATCHER_THREAD_POOL_SIZE = 200;
+  final private ExecutorService dispatcherExecutor =
+    Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
+  
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
@@ -372,22 +383,13 @@
     
     /* start a thread to dispatch the block move */
     private void scheduleBlockMove() {
-      BlockMover blockMover = new BlockMover();
-      blockMover.setDaemon(true);
-      blockMover.setName("Block mover for "+ block.getBlockId() +
-          " from " + proxySource.getName() + " to " + target.getName());
-      LOG.info("Starting " + blockMover.getName());
-      blockMover.start();
-    }
-    
-    /* A thread for moving a block */
-    private class BlockMover extends Thread {
-      BlockMover() {
-      }
-      
-      public void run() {
-        dispatch();
-      }
+      moverExecutor.execute(new Runnable() {
+        public void run() {
+          LOG.info("Starting moving "+ block.getBlockId() +
+              " from " + proxySource.getName() + " to " + target.getName());
+          dispatch();
+        }
+      });
     }
   }
   
@@ -580,7 +582,7 @@
     
     /* A thread that initiates a block move 
      * and waits for block move to complete */
-    private class BlockMoveDispatcher extends Thread {
+    private class BlockMoveDispatcher implements Runnable {
       public void run() {
         dispatchBlocks();
       }
@@ -1187,25 +1189,26 @@
    * blocked if there are too many un-confirmed block moves.
    * Return the total number of bytes successfully moved in this iteration.
    */
-  private long dispatchBlockMoves() {
+  private long dispatchBlockMoves() throws InterruptedException {
     long bytesLastMoved = bytesMoved.get();
-    Source.BlockMoveDispatcher dispatchers[] =
-      new Source.BlockMoveDispatcher[sources.size()];
+    Future<?>[] futures = new Future<?>[sources.size()];
     int i=0;
     for (Source source : sources) {
-      dispatchers[i] = source.new BlockMoveDispatcher();
-      dispatchers[i].setName("Dispatcher for source " + source.getName());
-      LOG.info("Starting " + dispatchers[i].getName());
-      dispatchers[i++].start();
+      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
     }
-    for (Source.BlockMoveDispatcher dispatcher : dispatchers) {
+    
+    // wait for all dispatcher threads to finish
+    for (Future<?> future : futures) {
       try {
-        dispatcher.join();
-      } catch (InterruptedException e) {
-        LOG.info(StringUtils.stringifyException(e));
+        future.get();
+      } catch (ExecutionException e) {
+        LOG.warn("Dispatcher thread failed", e.getCause());
       }
     }
+    
+    // wait for all block moving to be done
     waitForMoveCompletion();
+    
     return bytesMoved.get()-bytesLastMoved;
   }
   
@@ -1449,6 +1452,11 @@
           " . Exiting...");
       return IO_EXCEPTION;
     } finally {
+      // shutdown thread pools
+      dispatcherExecutor.shutdownNow();
+      moverExecutor.shutdownNow();
+
+      // close the output file
       IOUtils.closeStream(out); 
       try {
         fs.delete(BALANCER_ID_PATH, true);