You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/18 21:14:39 UTC
svn commit: r488404 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/Task.java
src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
Author: cutting
Date: Mon Dec 18 12:14:38 2006
New Revision: 488404
URL: http://svn.apache.org/viewvc?view=rev&rev=488404
Log:
HADOOP-811. Add a utility, MultithreadedMapRunner. Contributed by Alejandro Abdelnur.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=488404&r1=488403&r2=488404
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 18 12:14:38 2006
@@ -111,6 +111,9 @@
31. HADOOP-596. Fix a bug in phase reporting during reduce.
(Sanjay Dahiya via cutting)
+32. HADOOP-811. Add a utility, MultithreadedMapRunner.
+ (Alejandro Abdelnur via cutting)
+
Release 0.9.2 - 2006-12-15
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=488404&r1=488403&r2=488404
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Dec 18 12:14:38 2006
@@ -154,8 +154,10 @@
final Progress progress) throws IOException {
return new Reporter() {
public void setStatus(String status) throws IOException {
- progress.setStatus(status);
- progress();
+ synchronized (this) {
+ progress.setStatus(status);
+ progress();
+ }
}
public void progress() throws IOException {
reportProgress(umbilical);
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?view=auto&rev=488404
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Mon Dec 18 12:14:38 2006
@@ -0,0 +1,199 @@
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
+ * <p>
+ * It can be used instead of the default implementation,
+ * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
+ * bound in order to improve throughput.
+ * <p>
+ * Map implementations using this MapRunnable must be thread-safe.
+ * <p>
+ * The Map-Reduce job has to be configured to use this MapRunnable class (using
+ * the <b>mapred.map.runner.class</b> property) and
+ * the number of thread the thread-pool can use (using the
+ * <b>mapred.map.multithreadedrunner.threads</b> property).
+ * <p>
+ *
+ * @author Alejandro Abdelnur
+ */
+public class MultithreadedMapRunner implements MapRunnable {
+ private static final Log LOG =
+ LogFactory.getLog(MultithreadedMapRunner.class.getName());
+
+ private JobConf job;
+ private Mapper mapper;
+ private ExecutorService executorService;
+ private volatile IOException ioException;
+
+ public void configure(JobConf job) {
+ int numberOfThreads =
+ job.getInt("mapred.map.multithreadedrunner.threads", 10);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Configuring job " + job.getJobName() +
+ " to use " + numberOfThreads + " threads" );
+ }
+
+ this.job = job;
+ this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
+ job);
+
+ // Creating a threadpool of the configured size to execute the Mapper
+ // map method in parallel.
+ executorService = Executors.newFixedThreadPool(numberOfThreads);
+ }
+
+ public void run(RecordReader input, OutputCollector output,
+ Reporter reporter)
+ throws IOException {
+ try {
+ // allocate key & value instances these objects will not be reused
+ // because execution of Mapper.map is not serialized.
+ WritableComparable key = input.createKey();
+ Writable value = input.createValue();
+
+ while (input.next(key, value)) {
+
+ // Run Mapper.map execution asynchronously in a separate thread.
+ // If threads are not available from the thread-pool this method
+ // will block until there is a thread available.
+ executorService.execute(
+ new MapperInvokeRunable(key, value, output, reporter));
+
+ // Checking if a Mapper.map within a Runnable has generated an
+ // IOException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ if (ioException != null) {
+ throw ioException;
+ }
+
+ // Allocate new key & value instances as mapper is running in parallel
+ key = input.createKey();
+ value = input.createValue();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished dispatching all Mappper.map calls, job "
+ + job.getJobName());
+ }
+
+ // Graceful shutdown of the Threadpool, it will let all scheduled
+ // Runnables to end.
+ executorService.shutdown();
+
+ try {
+
+ // Now waiting for all Runnables to end.
+ while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Awaiting all running Mappper.map calls to finish, job "
+ + job.getJobName());
+ }
+
+ // Checking if a Mapper.map within a Runnable has generated an
+ // IOException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ // NOTE: while Mapper.map dispatching has concluded there are still
+ // map calls in progress.
+ if (ioException != null) {
+ throw ioException;
+ }
+ }
+
+ // Checking if a Mapper.map within a Runnable has generated an
+ // IOException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ // NOTE: it could be that a map call has had an exception after the
+ // call for awaitTermination() returing true. And edge case but it
+ // could happen.
+ if (ioException != null) {
+ throw ioException;
+ }
+ }
+ catch (IOException ioEx) {
+ // Forcing a shutdown of all thread of the threadpool and rethrowing
+ // the IOException
+ executorService.shutdownNow();
+ throw ioEx;
+ }
+ catch (InterruptedException iEx) {
+ throw new IOException(iEx.getMessage());
+ }
+
+ } finally {
+ mapper.close();
+ }
+ }
+
+
+ /**
+ * Runnable to execute a single Mapper.map call from a forked thread.
+ */
+ private class MapperInvokeRunable implements Runnable {
+ private WritableComparable key;
+ private Writable value;
+ private OutputCollector output;
+ private Reporter reporter;
+
+ /**
+ * Collecting all required parameters to execute a Mapper.map call.
+ * <p>
+ *
+ * @param key
+ * @param value
+ * @param output
+ * @param reporter
+ */
+ public MapperInvokeRunable(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter) {
+ this.key = key;
+ this.value = value;
+ this.output = output;
+ this.reporter = reporter;
+ }
+
+ /**
+ * Executes a Mapper.map call with the given Mapper and parameters.
+ * <p>
+ * This method is called from the thread-pool thread.
+ *
+ */
+ public void run() {
+ try {
+ // map pair to output
+ MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
+ }
+ catch (IOException ex) {
+ // If there is an IOException during the call it is set in an instance
+ // variable of the MultithreadedMapRunner from where it will be
+ // rethrown.
+ synchronized (MultithreadedMapRunner.this) {
+ if (MultithreadedMapRunner.this.ioException == null) {
+ MultithreadedMapRunner.this.ioException = ex;
+ }
+ }
+ }
+ }
+ }
+
+}