You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/06/23 20:05:58 UTC
svn commit: r670695 - in /hadoop/core/trunk: CHANGES.txt
src/hdfs/org/apache/hadoop/dfs/Balancer.java
Author: hairong
Date: Mon Jun 23 11:05:57 2008
New Revision: 670695
URL: http://svn.apache.org/viewvc?rev=670695&view=rev
Log:
HADOOP-3487. Balancer uses thread pools for managing its threads. Contributed by Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/Balancer.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=670695&r1=670694&r2=670695&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 23 11:05:57 2008
@@ -668,6 +668,9 @@
HADOOP-3522. Improve documentation on reduce pointing out that
input keys and values will be reused. (omalley)
+ HADOOP-3487. Balancer uses thread pools for managing its threads;
+ therefore provides better resource management. (hairong)
+
BUG FIXES
HADOOP-2159 Namenode stuck in safemode. The counter blockSafe should
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/Balancer.java?rev=670695&r1=670694&r2=670695&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/Balancer.java Mon Jun 23 11:05:57 2008
@@ -42,6 +42,10 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@@ -204,6 +208,13 @@
private double avgUtilization = 0.0D;
+ final private int MOVER_THREAD_POOL_SIZE = 1000;
+ final private ExecutorService moverExecutor =
+ Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
+ final private int DISPATCHER_THREAD_POOL_SIZE = 200;
+ final private ExecutorService dispatcherExecutor =
+ Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
+
/* This class keeps track of a scheduled block move */
private class PendingBlockMove {
private BalancerBlock block;
@@ -372,22 +383,13 @@
/* start a thread to dispatch the block move */
private void scheduleBlockMove() {
- BlockMover blockMover = new BlockMover();
- blockMover.setDaemon(true);
- blockMover.setName("Block mover for "+ block.getBlockId() +
- " from " + proxySource.getName() + " to " + target.getName());
- LOG.info("Starting " + blockMover.getName());
- blockMover.start();
- }
-
- /* A thread for moving a block */
- private class BlockMover extends Thread {
- BlockMover() {
- }
-
- public void run() {
- dispatch();
- }
+ moverExecutor.execute(new Runnable() {
+ public void run() {
+ LOG.info("Starting moving "+ block.getBlockId() +
+ " from " + proxySource.getName() + " to " + target.getName());
+ dispatch();
+ }
+ });
}
}
@@ -580,7 +582,7 @@
/* A thread that initiates a block move
* and waits for block move to complete */
- private class BlockMoveDispatcher extends Thread {
+ private class BlockMoveDispatcher implements Runnable {
public void run() {
dispatchBlocks();
}
@@ -1187,25 +1189,26 @@
* blocked if there are too many un-confirmed block moves.
* Return the total number of bytes successfully moved in this iteration.
*/
- private long dispatchBlockMoves() {
+ private long dispatchBlockMoves() throws InterruptedException {
long bytesLastMoved = bytesMoved.get();
- Source.BlockMoveDispatcher dispatchers[] =
- new Source.BlockMoveDispatcher[sources.size()];
+ Future<?>[] futures = new Future<?>[sources.size()];
int i=0;
for (Source source : sources) {
- dispatchers[i] = source.new BlockMoveDispatcher();
- dispatchers[i].setName("Dispatcher for source " + source.getName());
- LOG.info("Starting " + dispatchers[i].getName());
- dispatchers[i++].start();
+ futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
}
- for (Source.BlockMoveDispatcher dispatcher : dispatchers) {
+
+ // wait for all dispatcher threads to finish
+ for (Future<?> future : futures) {
try {
- dispatcher.join();
- } catch (InterruptedException e) {
- LOG.info(StringUtils.stringifyException(e));
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.warn("Dispatcher thread failed", e.getCause());
}
}
+
+ // wait for all block moving to be done
waitForMoveCompletion();
+
return bytesMoved.get()-bytesLastMoved;
}
@@ -1449,6 +1452,11 @@
" . Exiting...");
return IO_EXCEPTION;
} finally {
+ // shutdown thread pools
+ dispatcherExecutor.shutdownNow();
+ moverExecutor.shutdownNow();
+
+ // close the output file
IOUtils.closeStream(out);
try {
fs.delete(BALANCER_ID_PATH, true);