You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/25 07:39:32 UTC
svn commit: r640715 - in /hadoop/core/branches/branch-0.16: CHANGES.txt
src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
Author: ddas
Date: Mon Mar 24 23:39:31 2008
New Revision: 640715
URL: http://svn.apache.org/viewvc?rev=640715&view=rev
Log:
svn merge -r 640713:640714 from trunk to 0.16 branch. Fixes HADOOP-3049.
Added:
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
- copied unchanged from r640714, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
Modified:
hadoop/core/branches/branch-0.16/CHANGES.txt
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=640715&r1=640714&r2=640715&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Mon Mar 24 23:39:31 2008
@@ -34,6 +34,9 @@
HADOOP-2944. Fixes a "Run on Hadoop" wizard NPE when creating a
Location from the wizard. (taton)
+ HADOOP-3049. Fixes a problem in MultiThreadedMapRunner to do with
+ catching RuntimeExceptions. (Alejandro Abdelnur via ddas)
+
Release 0.16.1 - 2008-03-13
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=640715&r1=640714&r2=640715&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Mon Mar 24 23:39:31 2008
@@ -45,7 +45,7 @@
* Map implementations using this MapRunnable must be thread-safe.
* <p>
* The Map-Reduce job has to be configured to use this MapRunnable class (using
- * the <b>mapred.map.runner.class</b> property) and
+ * the JobConf.setMapRunnerClass method) and
* the number of thread the thread-pool can use (using the
* <b>mapred.map.multithreadedrunner.threads</b> property).
* <p>
@@ -63,19 +63,20 @@
private Mapper<K1, V1, K2, V2> mapper;
private ExecutorService executorService;
private volatile IOException ioException;
+ private volatile RuntimeException runtimeException;
@SuppressWarnings("unchecked")
- public void configure(JobConf job) {
+ public void configure(JobConf jobConf) {
int numberOfThreads =
- job.getInt("mapred.map.multithreadedrunner.threads", 10);
+ jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
if (LOG.isDebugEnabled()) {
- LOG.debug("Configuring job " + job.getJobName() +
+ LOG.debug("Configuring jobConf " + jobConf.getJobName() +
" to use " + numberOfThreads + " threads");
}
- this.job = job;
- this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
- job);
+ this.job = jobConf;
+ this.mapper = (Mapper)ReflectionUtils.newInstance(jobConf.getMapperClass(),
+ jobConf);
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
@@ -97,7 +98,8 @@
// If threads are not available from the thread-pool this method
// will block until there is a thread available.
executorService.execute(
- new MapperInvokeRunable(key, value, output, reporter));
+ new MapperInvokeRunable(key, value, output,
+ reporter));
// Checking if a Mapper.map within a Runnable has generated an
// IOException. If so we rethrow it to force an abort of the Map
@@ -107,6 +109,14 @@
throw ioException;
}
+ // Checking if a Mapper.map within a Runnable has generated a
+ // RuntimeException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ if (runtimeException != null) {
+ throw runtimeException;
+ }
+
// Allocate new key & value instances as mapper is running in parallel
key = input.createKey();
value = input.createValue();
@@ -130,35 +140,51 @@
+ job.getJobName());
}
+ // NOTE: while Mapper.map dispatching has concluded there are still
+ // map calls in progress.
+
// Checking if a Mapper.map within a Runnable has generated an
// IOException. If so we rethrow it to force an abort of the Map
// operation thus keeping the semantics of the default
// implementation.
- // NOTE: while Mapper.map dispatching has concluded there are still
- // map calls in progress.
if (ioException != null) {
throw ioException;
}
+
+ // Checking if a Mapper.map within a Runnable has generated a
+ // RuntimeException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ if (runtimeException != null) {
+ throw runtimeException;
+ }
}
+ // NOTE: it could be that a map call has had an exception after the
+ // call for awaitTermination() returing true. And edge case but it
+ // could happen.
+
// Checking if a Mapper.map within a Runnable has generated an
// IOException. If so we rethrow it to force an abort of the Map
// operation thus keeping the semantics of the default
// implementation.
- // NOTE: it could be that a map call has had an exception after the
- // call for awaitTermination() returing true. And edge case but it
- // could happen.
if (ioException != null) {
throw ioException;
}
- }
- catch (IOException ioEx) {
+
+ // Checking if a Mapper.map within a Runnable has generated a
+ // RuntimeException. If so we rethrow it to force an abort of the Map
+ // operation thus keeping the semantics of the default
+ // implementation.
+ if (runtimeException != null) {
+ throw runtimeException;
+ }
+ } catch (IOException ioEx) {
// Forcing a shutdown of all thread of the threadpool and rethrowing
// the IOException
executorService.shutdownNow();
throw ioEx;
- }
- catch (InterruptedException iEx) {
+ } catch (InterruptedException iEx) {
throw new IOException(iEx.getMessage());
}
@@ -205,14 +231,22 @@
try {
// map pair to output
MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
// If there is an IOException during the call it is set in an instance
// variable of the MultithreadedMapRunner from where it will be
// rethrown.
synchronized (MultithreadedMapRunner.this) {
if (MultithreadedMapRunner.this.ioException == null) {
MultithreadedMapRunner.this.ioException = ex;
+ }
+ }
+ } catch (RuntimeException ex) {
+ // If there is a RuntimeException during the call it is set in an
+ // instance variable of the MultithreadedMapRunner from where it will be
+ // rethrown.
+ synchronized (MultithreadedMapRunner.this) {
+ if (MultithreadedMapRunner.this.runtimeException == null) {
+ MultithreadedMapRunner.this.runtimeException = ex;
}
}
}