You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/25 07:39:32 UTC

svn commit: r640715 - in /hadoop/core/branches/branch-0.16: CHANGES.txt src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java

Author: ddas
Date: Mon Mar 24 23:39:31 2008
New Revision: 640715

URL: http://svn.apache.org/viewvc?rev=640715&view=rev
Log:
svn merge -r 640713:640714 from trunk to 0.16 branch. Fixes HADOOP-3049.

Added:
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
      - copied unchanged from r640714, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
Modified:
    hadoop/core/branches/branch-0.16/CHANGES.txt
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=640715&r1=640714&r2=640715&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Mon Mar 24 23:39:31 2008
@@ -34,6 +34,9 @@
     HADOOP-2944. Fixes a "Run on Hadoop" wizard NPE when creating a
     Location from the wizard. (taton)
 
+    HADOOP-3049. Fixes a problem in MultiThreadedMapRunner to do with
+    catching RuntimeExceptions. (Alejandro Abdelnur via ddas)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=640715&r1=640714&r2=640715&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Mon Mar 24 23:39:31 2008
@@ -45,7 +45,7 @@
  * Map implementations using this MapRunnable must be thread-safe.
  * <p>
  * The Map-Reduce job has to be configured to use this MapRunnable class (using
- * the <b>mapred.map.runner.class</b> property) and
+ * the JobConf.setMapRunnerClass method) and
  * the number of thread the thread-pool can use (using the
  * <b>mapred.map.multithreadedrunner.threads</b> property).
  * <p>
@@ -63,19 +63,20 @@
   private Mapper<K1, V1, K2, V2> mapper;
   private ExecutorService executorService;
   private volatile IOException ioException;
+  private volatile RuntimeException runtimeException;
 
   @SuppressWarnings("unchecked")
-  public void configure(JobConf job) {
+  public void configure(JobConf jobConf) {
     int numberOfThreads =
-      job.getInt("mapred.map.multithreadedrunner.threads", 10);
+      jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Configuring job " + job.getJobName() +
+      LOG.debug("Configuring jobConf " + jobConf.getJobName() +
                 " to use " + numberOfThreads + " threads");
     }
 
-    this.job = job;
-    this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
-                                                      job);
+    this.job = jobConf;
+    this.mapper = (Mapper)ReflectionUtils.newInstance(jobConf.getMapperClass(),
+        jobConf);
 
     // Creating a threadpool of the configured size to execute the Mapper
     // map method in parallel.
@@ -97,7 +98,8 @@
         // If threads are not available from the thread-pool this method
         // will block until there is a thread available.
         executorService.execute(
-                                new MapperInvokeRunable(key, value, output, reporter));
+                                new MapperInvokeRunable(key, value, output,
+                                    reporter));
 
         // Checking if a Mapper.map within a Runnable has generated an
         // IOException. If so we rethrow it to force an abort of the Map
@@ -107,6 +109,14 @@
           throw ioException;
         }
 
+        // Checking if a Mapper.map within a Runnable has generated a
+        // RuntimeException. If so we rethrow it to force an abort of the Map
+        // operation thus keeping the semantics of the default
+        // implementation.
+        if (runtimeException != null) {
+          throw runtimeException;
+        }
+
         // Allocate new key & value instances as mapper is running in parallel
         key = input.createKey();
         value = input.createValue();
@@ -130,35 +140,51 @@
                       + job.getJobName());
           }
 
+          // NOTE: while Mapper.map dispatching has concluded there are still
+          // map calls in progress.
+
           // Checking if a Mapper.map within a Runnable has generated an
           // IOException. If so we rethrow it to force an abort of the Map
           // operation thus keeping the semantics of the default
           // implementation.
-          // NOTE: while Mapper.map dispatching has concluded there are still
-          // map calls in progress.
           if (ioException != null) {
             throw ioException;
           }
+
+          // Checking if a Mapper.map within a Runnable has generated a
+          // RuntimeException. If so we rethrow it to force an abort of the Map
+          // operation thus keeping the semantics of the default
+          // implementation.
+          if (runtimeException != null) {
+            throw runtimeException;
+          }
         }
 
+        // NOTE: it could be that a map call has had an exception after the
+        // call for awaitTermination() returing true. And edge case but it
+        // could happen.
+
         // Checking if a Mapper.map within a Runnable has generated an
         // IOException. If so we rethrow it to force an abort of the Map
         // operation thus keeping the semantics of the default
         // implementation.
-        // NOTE: it could be that a map call has had an exception after the
-        // call for awaitTermination() returing true. And edge case but it
-        // could happen.
         if (ioException != null) {
           throw ioException;
         }
-      }
-      catch (IOException ioEx) {
+
+        // Checking if a Mapper.map within a Runnable has generated a
+        // RuntimeException. If so we rethrow it to force an abort of the Map
+        // operation thus keeping the semantics of the default
+        // implementation.
+        if (runtimeException != null) {
+          throw runtimeException;
+        }
+      } catch (IOException ioEx) {
         // Forcing a shutdown of all thread of the threadpool and rethrowing
         // the IOException
         executorService.shutdownNow();
         throw ioEx;
-      }
-      catch (InterruptedException iEx) {
+      } catch (InterruptedException iEx) {
         throw new IOException(iEx.getMessage());
       }
 
@@ -205,14 +231,22 @@
       try {
         // map pair to output
         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         // If there is an IOException during the call it is set in an instance
         // variable of the MultithreadedMapRunner from where it will be
         // rethrown.
         synchronized (MultithreadedMapRunner.this) {
           if (MultithreadedMapRunner.this.ioException == null) {
             MultithreadedMapRunner.this.ioException = ex;
+          }
+        }
+      } catch (RuntimeException ex) {
+        // If there is a RuntimeException during the call it is set in an
+        // instance variable of the MultithreadedMapRunner from where it will be
+        // rethrown.
+        synchronized (MultithreadedMapRunner.this) {
+          if (MultithreadedMapRunner.this.runtimeException == null) {
+            MultithreadedMapRunner.this.runtimeException = ex;
           }
         }
       }