You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/26 02:55:50 UTC
svn commit: r1476006 - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-comm...
Author: szetszwo
Date: Fri Apr 26 00:55:45 2013
New Revision: 1476006
URL: http://svn.apache.org/r1476006
Log:
Merge r1471229 through r1476005 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java
- copied unchanged from r1476005, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java
Modified:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1471229-1476005
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Apr 26 00:55:45 2013
@@ -209,6 +209,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs
anyways after YARN-561. (Xuan Gong via vinodkv)
+ MAPREDUCE-5069. add concrete common implementations of
+ CombineFileInputFormat (Sangjin Lee via bobby)
+
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
@@ -336,6 +339,23 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5146. application classloader may be used too early to load
classes. (Sangjin Lee via tomwhite)
+ MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
+ with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that
+ cleanup is now called even if there is an error. The old mapred API
+ already ensures that Mapper.close and Reducer.close are invoked during
+ error handling. Note that it is an incompatible change, however end-users
+ can override Mapper.run and Reducer.run to get the old (inconsistent)
+ behaviour. (acmurthy)
+
+ MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
+ synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
+
+ MAPREDUCE-5181. RMCommunicator should not use AMToken from the env.
+ (Vinod Kumar Vavilapalli via sseth)
+
+ MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
+ YARN-577. (Hitesh Shah via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1471229-1476005
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1471229-1476005
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Apr 26 00:55:45 2013
@@ -38,13 +38,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -280,24 +276,7 @@ public abstract class RMCommunicator ext
throw new YarnException(e);
}
- if (UserGroupInformation.isSecurityEnabled()) {
- String tokenURLEncodedStr = System.getenv().get(
- ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
- Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
- try {
- token.decodeFromUrlString(tokenURLEncodedStr);
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- SecurityUtil.setTokenService(token, serviceAddr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("AppMasterToken is " + token);
- }
- currentUser.addToken(token);
- }
-
+ // CurrentUser should already have AMToken loaded.
return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override
public AMRMProtocol run() {
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Apr 26 00:55:45 2013
@@ -18,6 +18,10 @@
package org.apache.hadoop.mapred;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -86,8 +90,6 @@ public class LocalJobRunner implements C
private static final String jobDir = "localRunner/";
- private static final Counters EMPTY_COUNTERS = new Counters();
-
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
@@ -273,10 +275,10 @@ public class LocalJobRunner implements C
this.partialMapProgress = new float[numMaps];
this.mapCounters = new Counters[numMaps];
for (int i = 0; i < numMaps; i++) {
- this.mapCounters[i] = EMPTY_COUNTERS;
+ this.mapCounters[i] = new Counters();
}
- this.reduceCounters = EMPTY_COUNTERS;
+ this.reduceCounters = new Counters();
}
/**
@@ -497,6 +499,15 @@ public class LocalJobRunner implements C
public synchronized boolean statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException {
+ // Serialize as we would if distributed in order to make deep copy
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ taskStatus.write(dos);
+ dos.close();
+ taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap());
+ taskStatus.readFields(new DataInputStream(
+ new ByteArrayInputStream(baos.toByteArray())));
+
LOG.info(taskStatus.getStateString());
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@@ -525,10 +536,10 @@ public class LocalJobRunner implements C
public synchronized Counters getCurrentCounters() {
if (null == mapCounters) {
// Counters not yet initialized for job.
- return EMPTY_COUNTERS;
+ return new Counters();
}
- Counters current = EMPTY_COUNTERS;
+ Counters current = new Counters();
for (Counters c : mapCounters) {
current = Counters.sum(current, c);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Fri Apr 26 00:55:45 2013
@@ -434,10 +434,15 @@ public class MapTask extends Task {
}
statusUpdate(umbilical);
collector.flush();
- } finally {
- //close
- in.close(); // close input
+
+ in.close();
+ in = null;
+
collector.close();
+ collector = null;
+ } finally {
+ closeQuietly(in);
+ closeQuietly(collector);
}
}
@@ -753,13 +758,20 @@ public class MapTask extends Task {
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
- input.initialize(split, mapperContext);
- mapper.run(mapperContext);
- mapPhase.complete();
- setPhase(TaskStatus.Phase.SORT);
- statusUpdate(umbilical);
- input.close();
- output.close(mapperContext);
+ try {
+ input.initialize(split, mapperContext);
+ mapper.run(mapperContext);
+ mapPhase.complete();
+ setPhase(TaskStatus.Phase.SORT);
+ statusUpdate(umbilical);
+ input.close();
+ input = null;
+ output.close(mapperContext);
+ output = null;
+ } finally {
+ closeQuietly(input);
+ closeQuietly(output, mapperContext);
+ }
}
class DirectMapOutputCollector<K, V>
@@ -1949,4 +1961,55 @@ public class MapTask extends Task {
}
}
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void closeQuietly(RecordReader<INKEY, INVALUE> c) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (IOException ie) {
+ // Ignore
+ LOG.info("Ignoring exception during close for " + c, ie);
+ }
+ }
+ }
+
+ private <OUTKEY, OUTVALUE>
+ void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Exception ie) {
+ // Ignore
+ LOG.info("Ignoring exception during close for " + c, ie);
+ }
+ }
+ }
+
+ private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+ void closeQuietly(
+ org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Exception ie) {
+ // Ignore
+ LOG.info("Ignoring exception during close for " + c, ie);
+ }
+ }
+ }
+
+ private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+ void closeQuietly(
+ org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
+ org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ mapperContext) {
+ if (c != null) {
+ try {
+ c.close(mapperContext);
+ } catch (Exception ie) {
+ // Ignore
+ LOG.info("Ignoring exception during close for " + c, ie);
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Fri Apr 26 00:55:45 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@@ -428,14 +429,15 @@ public class ReduceTask extends Task {
// make output collector
String finalName = getOutputName(getPartition());
- final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+ RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
this, job, reporter, finalName);
-
+ final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
+
OutputCollector<OUTKEY,OUTVALUE> collector =
new OutputCollector<OUTKEY,OUTVALUE>() {
public void collect(OUTKEY key, OUTVALUE value)
throws IOException {
- out.write(key, value);
+ finalOut.write(key, value);
// indicate that progress update needs to be sent
reporter.progress();
}
@@ -466,20 +468,14 @@ public class ReduceTask extends Task {
values.informReduceProgress();
}
- //Clean up: repeated in catch block below
reducer.close();
- out.close(reporter);
- //End of clean up.
- } catch (IOException ioe) {
- try {
- reducer.close();
- } catch (IOException ignored) {}
-
- try {
- out.close(reporter);
- } catch (IOException ignored) {}
+ reducer = null;
- throw ioe;
+ out.close(reporter);
+ out = null;
+ } finally {
+ IOUtils.cleanup(LOG, reducer);
+ closeQuietly(out, reporter);
}
}
@@ -645,7 +641,21 @@ public class ReduceTask extends Task {
committer,
reporter, comparator, keyClass,
valueClass);
- reducer.run(reducerContext);
- trackedRW.close(reducerContext);
+ try {
+ reducer.run(reducerContext);
+ } finally {
+ trackedRW.close(reducerContext);
+ }
+ }
+
+ private <OUTKEY, OUTVALUE>
+ void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
+ if (c != null) {
+ try {
+ c.close(r);
+ } catch (Exception e) {
+ LOG.info("Exception in closing " + c, e);
+ }
+ }
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java Fri Apr 26 00:55:45 2013
@@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
+ try {
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
+ }
+ } finally {
+ cleanup(context);
}
- cleanup(context);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Fri Apr 26 00:55:45 2013
@@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
- while (context.nextKey()) {
- reduce(context.getCurrentKey(), context.getValues(), context);
- // If a back up store is used, reset it
- Iterator<VALUEIN> iter = context.getValues().iterator();
- if(iter instanceof ReduceContext.ValueIterator) {
- ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
+ try {
+ while (context.nextKey()) {
+ reduce(context.getCurrentKey(), context.getValues(), context);
+ // If a back up store is used, reset it
+ Iterator<VALUEIN> iter = context.getValues().iterator();
+ if(iter instanceof ReduceContext.ValueIterator) {
+ ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
+ }
}
+ } finally {
+ cleanup(context);
}
- cleanup(context);
}
}
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1471229-1476005
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Fri Apr 26 00:55:45 2013
@@ -89,7 +89,7 @@ public class NotRunningJob implements MR
// used for a non running job
return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
- "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
+ "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f);
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1476006&r1=1476005&r2=1476006&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Fri Apr 26 00:55:45 2013
@@ -413,7 +413,7 @@ public class TestClientServiceDelegate {
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A");
+ "N/A", 0.0f);
}
private ApplicationReport getRunningApplicationReport(String host, int port) {
@@ -423,7 +423,7 @@ public class TestClientServiceDelegate {
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
"appname", host, port, null, YarnApplicationState.RUNNING,
"diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null,
- "N/A");
+ "N/A", 0.0f);
}
private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {