You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/03/28 22:15:19 UTC

svn commit: r642387 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java

Author: omalley
Date: Fri Mar 28 14:15:10 2008
New Revision: 642387

URL: http://svn.apache.org/viewvc?rev=642387&view=rev
Log:
HADOOP-3104. Limit MultithreadedMapRunner to have a fixed length queue
between the RecordReader and the map threads. Contributed by Alejandro 
Abdelnur.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=642387&r1=642386&r2=642387&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Mar 28 14:15:10 2008
@@ -465,6 +465,10 @@
     HADOOP-3103. [HOD] Hadoop.tmp.dir should not be set to cluster 
     directory. (Vinod Kumar Vavilapalli via ddas).
 
+    HADOOP-3104. Limit MultithreadedMapRunner to have a fixed length queue
+    between the RecordReader and the map threads. (Alejandro Abdelnur via
+    omalley)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=642387&r1=642386&r2=642387&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Fri Mar 28 14:15:10 2008
@@ -31,9 +31,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
@@ -46,8 +44,9 @@
  * <p>
  * The Map-Reduce job has to be configured to use this MapRunnable class (using
  * the JobConf.setMapRunnerClass method) and
- * the number of thread the thread-pool can use (using the
- * <b>mapred.map.multithreadedrunner.threads</b> property).
+ * the number of thread the thread-pool can use with the
+ * <code>mapred.map.multithreadedrunner.threads</code> property, its default
+ * value is 10 threads.
  * <p>
  */
 public class MultithreadedMapRunner<K1 extends WritableComparable,
@@ -80,7 +79,50 @@
 
     // Creating a threadpool of the configured size to execute the Mapper
     // map method in parallel.
-    executorService = Executors.newFixedThreadPool(numberOfThreads);
+    executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
+                                             0L, TimeUnit.MILLISECONDS,
+                                             new BlockingArrayQueue
+                                               (numberOfThreads));
+  }
+
+  /**
+   * A blocking array queue that replaces offer and add, which throws on a full
+   * queue, to a put, which waits on a full queue.
+   */
+  private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
+    public BlockingArrayQueue(int capacity) {
+      super(capacity);
+    }
+    public boolean offer(Runnable r) {
+      return add(r);
+    }
+    public boolean add(Runnable r) {
+      try {
+        put(r);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+      return true;
+    }
+  }
+
+  private void checkForExceptionsFromProcessingThreads()
+      throws IOException, RuntimeException {
+    // Checking if a Mapper.map within a Runnable has generated an
+    // IOException. If so we rethrow it to force an abort of the Map
+    // operation thus keeping the semantics of the default
+    // implementation.
+    if (ioException != null) {
+      throw ioException;
+    }
+
+    // Checking if a Mapper.map within a Runnable has generated a
+    // RuntimeException. If so we rethrow it to force an abort of the Map
+    // operation thus keeping the semantics of the default
+    // implementation.
+    if (runtimeException != null) {
+      throw runtimeException;
+    }
   }
 
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
@@ -94,28 +136,10 @@
 
       while (input.next(key, value)) {
 
-        // Run Mapper.map execution asynchronously in a separate thread.
-        // If threads are not available from the thread-pool this method
-        // will block until there is a thread available.
-        executorService.execute(
-                                new MapperInvokeRunable(key, value, output,
-                                    reporter));
-
-        // Checking if a Mapper.map within a Runnable has generated an
-        // IOException. If so we rethrow it to force an abort of the Map
-        // operation thus keeping the semantics of the default
-        // implementation.
-        if (ioException != null) {
-          throw ioException;
-        }
+        executorService.execute(new MapperInvokeRunable(key, value, output,
+                                reporter));
 
-        // Checking if a Mapper.map within a Runnable has generated a
-        // RuntimeException. If so we rethrow it to force an abort of the Map
-        // operation thus keeping the semantics of the default
-        // implementation.
-        if (runtimeException != null) {
-          throw runtimeException;
-        }
+        checkForExceptionsFromProcessingThreads();
 
         // Allocate new key & value instances as mapper is running in parallel
         key = input.createKey();
@@ -141,51 +165,23 @@
           }
 
           // NOTE: while Mapper.map dispatching has concluded there are still
-          // map calls in progress.
-
-          // Checking if a Mapper.map within a Runnable has generated an
-          // IOException. If so we rethrow it to force an abort of the Map
-          // operation thus keeping the semantics of the default
-          // implementation.
-          if (ioException != null) {
-            throw ioException;
-          }
+          // map calls in progress and exceptions would be thrown.
+          checkForExceptionsFromProcessingThreads();
 
-          // Checking if a Mapper.map within a Runnable has generated a
-          // RuntimeException. If so we rethrow it to force an abort of the Map
-          // operation thus keeping the semantics of the default
-          // implementation.
-          if (runtimeException != null) {
-            throw runtimeException;
-          }
         }
 
         // NOTE: it could be that a map call has had an exception after the
         // call for awaitTermination() returing true. And edge case but it
         // could happen.
+        checkForExceptionsFromProcessingThreads();
 
-        // Checking if a Mapper.map within a Runnable has generated an
-        // IOException. If so we rethrow it to force an abort of the Map
-        // operation thus keeping the semantics of the default
-        // implementation.
-        if (ioException != null) {
-          throw ioException;
-        }
-
-        // Checking if a Mapper.map within a Runnable has generated a
-        // RuntimeException. If so we rethrow it to force an abort of the Map
-        // operation thus keeping the semantics of the default
-        // implementation.
-        if (runtimeException != null) {
-          throw runtimeException;
-        }
       } catch (IOException ioEx) {
         // Forcing a shutdown of all thread of the threadpool and rethrowing
         // the IOException
         executorService.shutdownNow();
         throw ioEx;
       } catch (InterruptedException iEx) {
-        throw new IOException(iEx.getMessage());
+        throw new RuntimeException(iEx);
       }
 
     } finally {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java?rev=642387&r1=642386&r2=642387&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java Fri Mar 28 14:15:10 2008
@@ -136,6 +136,11 @@
         throw new RuntimeException();
       }
       output.collect(key, value);
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
     }