You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/03/28 22:18:02 UTC
svn commit: r642390 - in /hadoop/core/branches/branch-0.16: CHANGES.txt
src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
Author: omalley
Date: Fri Mar 28 14:18:01 2008
New Revision: 642390
URL: http://svn.apache.org/viewvc?rev=642390&view=rev
Log:
Merge -r 642386:642387 from trunk to branch 0.16 to fix HADOOP-3104.
Modified:
hadoop/core/branches/branch-0.16/CHANGES.txt
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=642390&r1=642389&r2=642390&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Fri Mar 28 14:18:01 2008
@@ -60,6 +60,10 @@
HADOOP-3103. [HOD] Hadoop.tmp.dir should not be set to cluster
directory. (Vinod Kumar Vavilapalli via ddas)
+ HADOOP-3104. Limit MultithreadedMapRunner to have a fixed length queue
+ between the RecordReader and the map threads. (Alejandro Abdelnur via
+ omalley)
+
Release 0.16.1 - 2008-03-13
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=642390&r1=642389&r2=642390&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Fri Mar 28 14:18:01 2008
@@ -31,9 +31,7 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
@@ -46,8 +44,9 @@
* <p>
* The Map-Reduce job has to be configured to use this MapRunnable class (using
* the JobConf.setMapRunnerClass method) and
- * the number of thread the thread-pool can use (using the
- * <b>mapred.map.multithreadedrunner.threads</b> property).
+ * the number of thread the thread-pool can use with the
+ * <code>mapred.map.multithreadedrunner.threads</code> property, its default
+ * value is 10 threads.
* <p>
*/
public class MultithreadedMapRunner<K1 extends WritableComparable,
@@ -80,7 +79,50 @@
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
- executorService = Executors.newFixedThreadPool(numberOfThreads);
+ executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new BlockingArrayQueue
+ (numberOfThreads));
+ }
+
+ /**
+ * A blocking array queue that replaces offer and add, which throws on a full
+ * queue, to a put, which waits on a full queue.
+ */
+ private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
+ public BlockingArrayQueue(int capacity) {
+ super(capacity);
+ }
+ public boolean offer(Runnable r) {
+ return add(r);
+ }
+ public boolean add(Runnable r) {
+ try {
+ put(r);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ return true;
+ }
+ }
+
+ private void checkForExceptionsFromProcessingThreads()
+ throws IOException, RuntimeException {
+ // Checking if a Mapper.map within a Runnable has generated an
+ // IOException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ if (ioException != null) {
+ throw ioException;
+ }
+
+ // Checking if a Mapper.map within a Runnable has generated a
+ // RuntimeException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ if (runtimeException != null) {
+ throw runtimeException;
+ }
}
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
@@ -94,28 +136,10 @@
while (input.next(key, value)) {
- // Run Mapper.map execution asynchronously in a separate thread.
- // If threads are not available from the thread-pool this method
- // will block until there is a thread available.
- executorService.execute(
- new MapperInvokeRunable(key, value, output,
- reporter));
-
- // Checking if a Mapper.map within a Runnable has generated an
- // IOException. If so we rethrow it to force an abort of the Map
- // operation thus keeping the semantics of the default
- // implementation.
- if (ioException != null) {
- throw ioException;
- }
+ executorService.execute(new MapperInvokeRunable(key, value, output,
+ reporter));
- // Checking if a Mapper.map within a Runnable has generated a
- // RuntimeException. If so we rethrow it to force an abort of the Map
- // operation thus keeping the semantics of the default
- // implementation.
- if (runtimeException != null) {
- throw runtimeException;
- }
+ checkForExceptionsFromProcessingThreads();
// Allocate new key & value instances as mapper is running in parallel
key = input.createKey();
@@ -141,51 +165,23 @@
}
// NOTE: while Mapper.map dispatching has concluded there are still
- // map calls in progress.
-
- // Checking if a Mapper.map within a Runnable has generated an
- // IOException. If so we rethrow it to force an abort of the Map
- // operation thus keeping the semantics of the default
- // implementation.
- if (ioException != null) {
- throw ioException;
- }
+ // map calls in progress and exceptions would be thrown.
+ checkForExceptionsFromProcessingThreads();
- // Checking if a Mapper.map within a Runnable has generated a
- // RuntimeException. If so we rethrow it to force an abort of the Map
- // operation thus keeping the semantics of the default
- // implementation.
- if (runtimeException != null) {
- throw runtimeException;
- }
}
// NOTE: it could be that a map call has had an exception after the
// call for awaitTermination() returing true. And edge case but it
// could happen.
+ checkForExceptionsFromProcessingThreads();
- // Checking if a Mapper.map within a Runnable has generated an
- // IOException. If so we rethrow it to force an abort of the Map
- // operation thus keeping the semantics of the default
- // implementation.
- if (ioException != null) {
- throw ioException;
- }
-
- // Checking if a Mapper.map within a Runnable has generated a
- // RuntimeException. If so we rethrow it to force an abort of the Map
- // operation thus keeping the semantics of the default
- // implementation.
- if (runtimeException != null) {
- throw runtimeException;
- }
} catch (IOException ioEx) {
// Forcing a shutdown of all thread of the threadpool and rethrowing
// the IOException
executorService.shutdownNow();
throw ioEx;
} catch (InterruptedException iEx) {
- throw new IOException(iEx.getMessage());
+ throw new RuntimeException(iEx);
}
} finally {
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java?rev=642390&r1=642389&r2=642390&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java Fri Mar 28 14:18:01 2008
@@ -137,6 +137,11 @@
throw new RuntimeException();
}
output.collect(key, value);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
}