You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/18 21:14:39 UTC

svn commit: r488404 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/Task.java src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

Author: cutting
Date: Mon Dec 18 12:14:38 2006
New Revision: 488404

URL: http://svn.apache.org/viewvc?view=rev&rev=488404
Log:
HADOOP-811.  Add a utility, MultithreadedMapRunner.  Contributed by Alejandro Abdelnur.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=488404&r1=488403&r2=488404
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 18 12:14:38 2006
@@ -111,6 +111,9 @@
 31. HADOOP-596.  Fix a bug in phase reporting during reduce.
     (Sanjay Dahiya via cutting)
 
+32. HADOOP-811.  Add a utility, MultithreadedMapRunner.
+    (Alejandro Abdelnur via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=488404&r1=488403&r2=488404
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Dec 18 12:14:38 2006
@@ -154,8 +154,10 @@
                               final Progress progress) throws IOException {
     return new Reporter() {
         public void setStatus(String status) throws IOException {
-          progress.setStatus(status);
-          progress();
+          synchronized (this) {
+            progress.setStatus(status);
+            progress();
+          }
         }
         public void progress() throws IOException {
             reportProgress(umbilical);

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?view=auto&rev=488404
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Mon Dec 18 12:14:38 2006
@@ -0,0 +1,199 @@
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
+ * <p>
+ * It can be used instead of the default implementation,
+ * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
+ * bound in order to improve throughput.
+ * <p>
+ * Map implementations using this MapRunnable must be thread-safe.
+ * <p>
+ * The Map-Reduce job has to be configured to use this MapRunnable class (using
+ * the <b>mapred.map.runner.class</b> property) and
+ * the number of thread the thread-pool can use (using the
+ * <b>mapred.map.multithreadedrunner.threads</b> property).
+ * <p>
+ *
+ * @author Alejandro Abdelnur
+ */
+public class MultithreadedMapRunner implements MapRunnable {
+  private static final Log LOG =
+      LogFactory.getLog(MultithreadedMapRunner.class.getName());
+
+  private JobConf job;
+  private Mapper mapper;
+  private ExecutorService executorService;
+  private volatile IOException ioException;
+
+  public void configure(JobConf job) {
+    int numberOfThreads =
+        job.getInt("mapred.map.multithreadedrunner.threads", 10);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Configuring job " + job.getJobName() +
+          " to use " + numberOfThreads + " threads" );
+    }
+
+    this.job = job;
+    this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
+                                                      job);
+
+    // Creating a threadpool of the configured size to execute the Mapper
+    // map method in parallel.
+    executorService = Executors.newFixedThreadPool(numberOfThreads);
+  }
+
+  public void run(RecordReader input, OutputCollector output,
+                  Reporter reporter)
+    throws IOException {
+    try {
+      // allocate key & value instances these objects will not be reused
+      // because execution of Mapper.map is not serialized.
+      WritableComparable key = input.createKey();
+      Writable value = input.createValue();
+
+      while (input.next(key, value)) {
+
+        // Run Mapper.map execution asynchronously in a separate thread.
+        // If threads are not available from the thread-pool this method
+        // will block until there is a thread available.
+        executorService.execute(
+            new MapperInvokeRunable(key, value, output, reporter));
+
+        // Checking if a Mapper.map within a Runnable has generated an
+        // IOException. If so we rethrow it to force an abort of the Map
+        // operation thus keeping the semantics of the default
+        // implementation.
+        if (ioException != null) {
+          throw ioException;
+        }
+
+        // Allocate new key & value instances as mapper is running in parallel
+        key = input.createKey();
+        value = input.createValue();
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Finished dispatching all Mappper.map calls, job "
+            + job.getJobName());
+      }
+
+      // Graceful shutdown of the Threadpool, it will let all scheduled
+      // Runnables to end.
+      executorService.shutdown();
+
+      try {
+
+        // Now waiting for all Runnables to end.
+        while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Awaiting all running Mappper.map calls to finish, job "
+                + job.getJobName());
+          }
+
+          // Checking if a Mapper.map within a Runnable has generated an
+          // IOException. If so we rethrow it to force an abort of the Map
+          // operation thus keeping the semantics of the default
+          // implementation.
+          // NOTE: while Mapper.map dispatching has concluded there are still
+          // map calls in progress.
+          if (ioException != null) {
+            throw ioException;
+          }
+        }
+
+        // Checking if a Mapper.map within a Runnable has generated an
+        // IOException. If so we rethrow it to force an abort of the Map
+        // operation thus keeping the semantics of the default
+        // implementation.
+        // NOTE: it could be that a map call has had an exception after the
+        // call for awaitTermination() returing true. And edge case but it
+        // could happen.
+        if (ioException != null) {
+          throw ioException;
+        }
+      }
+      catch (IOException ioEx) {
+        // Forcing a shutdown of all thread of the threadpool and rethrowing
+        // the IOException
+        executorService.shutdownNow();
+        throw ioEx;
+      }
+      catch (InterruptedException iEx) {
+        throw new IOException(iEx.getMessage());
+      }
+
+    } finally {
+        mapper.close();
+    }
+  }
+
+
+  /**
+   * Runnable to execute a single Mapper.map call from a forked thread.
+   */
+  private class MapperInvokeRunable implements Runnable {
+    private WritableComparable key;
+    private Writable value;
+    private OutputCollector output;
+    private Reporter reporter;
+
+    /**
+     * Collecting all required parameters to execute a Mapper.map call.
+     * <p>
+     *
+     * @param key
+     * @param value
+     * @param output
+     * @param reporter
+     */
+    public MapperInvokeRunable(WritableComparable key, Writable value,
+        OutputCollector output, Reporter reporter) {
+      this.key = key;
+      this.value = value;
+      this.output = output;
+      this.reporter = reporter;
+    }
+
+    /**
+     * Executes a Mapper.map call with the given Mapper and parameters.
+     * <p>
+     * This method is called from the thread-pool thread.
+     *
+     */
+    public void run() {
+      try {
+        // map pair to output
+        MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
+      }
+      catch (IOException ex) {
+        // If there is an IOException during the call it is set in an instance
+        // variable of the MultithreadedMapRunner from where it will be
+        // rethrown.
+        synchronized (MultithreadedMapRunner.this) {
+          if (MultithreadedMapRunner.this.ioException == null) {
+            MultithreadedMapRunner.this.ioException = ex;
+          }
+        }
+      }
+    }
+  }
+
+}