You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/10 14:33:34 UTC

flink git commit: [FLINK-2617] [hadoop-compat] Added static mutexes for configure, open, close HadoopFormats

Repository: flink
Updated Branches:
  refs/heads/release-0.9 ae2b59d83 -> d656fc33a


[FLINK-2617] [hadoop-compat] Added static mutexes for configure, open, close HadoopFormats

This closes #1111


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d656fc33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d656fc33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d656fc33

Branch: refs/heads/release-0.9
Commit: d656fc33a28dbcc286723b1f7413e297b1407add
Parents: ae2b59d
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 9 14:32:21 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 14:32:51 2015 +0200

----------------------------------------------------------------------
 .../hadoop/mapred/HadoopInputFormatBase.java    |  46 ++++--
 .../hadoop/mapred/HadoopOutputFormatBase.java   | 101 +++++++------
 .../hadoop/mapreduce/HadoopInputFormatBase.java |  55 ++++---
 .../mapreduce/HadoopOutputFormatBase.java       | 142 +++++++++++--------
 4 files changed, 212 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d656fc33/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index 56a8cc8..189e55a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
 
+	// Mutexes to avoid concurrent operations on Hadoop InputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
 	protected Class<K> keyClass;
 	protected Class<V> valueClass;
@@ -91,12 +99,15 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 	
 	@Override
 	public void configure(Configuration parameters) {
-		// configure MR InputFormat if necessary
-		if(this.mapredInputFormat instanceof Configurable) {
-			((Configurable)this.mapredInputFormat).setConf(this.jobConf);
-		}
-		else if(this.mapredInputFormat instanceof JobConfigurable) {
-			((JobConfigurable)this.mapredInputFormat).configure(this.jobConf);
+
+		// enforce sequential configuration() calls
+		synchronized (CONFIGURE_MUTEX) {
+			// configure MR InputFormat if necessary
+			if (this.mapredInputFormat instanceof Configurable) {
+				((Configurable) this.mapredInputFormat).setConf(this.jobConf);
+			} else if (this.mapredInputFormat instanceof JobConfigurable) {
+				((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
+			}
 		}
 	}
 	
@@ -148,13 +159,18 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 	
 	@Override
 	public void open(HadoopInputSplit split) throws IOException {
-		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
-		if (this.recordReader instanceof Configurable) {
-			((Configurable) this.recordReader).setConf(jobConf);
+
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+
+			this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+			if (this.recordReader instanceof Configurable) {
+				((Configurable) this.recordReader).setConf(jobConf);
+			}
+			key = this.recordReader.createKey();
+			value = this.recordReader.createValue();
+			this.fetched = false;
 		}
-		key = this.recordReader.createKey();
-		value = this.recordReader.createValue();
-		this.fetched = false;
 	}
 	
 	@Override
@@ -172,7 +188,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void close() throws IOException {
-		this.recordReader.close();
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			this.recordReader.close();
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d656fc33/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index 456003f..40214f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -54,6 +54,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	private static final long serialVersionUID = 1L;
 
+	// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	private JobConf jobConf;
 	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
 	protected transient RecordWriter<K,V> recordWriter;
@@ -77,12 +85,15 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	@Override
 	public void configure(Configuration parameters) {
-		// configure MR OutputFormat if necessary
-		if(this.mapredOutputFormat instanceof Configurable) {
-			((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
-		}
-		else if(this.mapredOutputFormat instanceof JobConfigurable) {
-			((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf);
+
+		// enforce sequential configure() calls
+		synchronized (CONFIGURE_MUTEX) {
+			// configure MR OutputFormat if necessary
+			if (this.mapredOutputFormat instanceof Configurable) {
+				((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
+			} else if (this.mapredOutputFormat instanceof JobConfigurable) {
+				((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
+			}
 		}
 	}
 
@@ -94,39 +105,43 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
-				+ Integer.toString(taskNumber + 1)
-				+ "_0");
 
-		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
-		// for hadoop 2.2
-		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
-
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+			if (Integer.toString(taskNumber + 1).length() > 6) {
+				throw new IOException("Task id too large.");
+			}
+
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+					+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+					+ Integer.toString(taskNumber + 1)
+					+ "_0");
+
+			this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+			this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+			// for hadoop 2.2
+			this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+			this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+
+			try {
+				this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			this.outputCommitter = this.jobConf.getOutputCommitter();
+
+			JobContext jobContext;
+			try {
+				jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			this.outputCommitter.setupJob(jobContext);
+
+			this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
 		}
-
-		this.outputCommitter = this.jobConf.getOutputCommitter();
-
-		JobContext jobContext;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		this.outputCommitter.setupJob(jobContext);
-
-		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
 	}
 
 	/**
@@ -135,10 +150,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void close() throws IOException {
-		this.recordWriter.close(new HadoopDummyReporter());
-		
-		if (this.outputCommitter.needsTaskCommit(this.context)) {
-			this.outputCommitter.commitTask(this.context);
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			this.recordWriter.close(new HadoopDummyReporter());
+
+			if (this.outputCommitter.needsTaskCommit(this.context)) {
+				this.outputCommitter.commitTask(this.context);
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d656fc33/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 09435e2..e9b23f7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
 
+	// Mutexes to avoid concurrent operations on Hadoop InputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	// NOTE: this class is using a custom serialization logic, without a defaultWriteObject() method.
 	// Hence, all fields here are "transient".
 	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
@@ -89,8 +97,12 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void configure(Configuration parameters) {
-		if (mapreduceInputFormat instanceof Configurable) {
-			((Configurable) mapreduceInputFormat).setConf(configuration);
+
+		// enforce sequential configuration() calls
+		synchronized (CONFIGURE_MUTEX) {
+			if (mapreduceInputFormat instanceof Configurable) {
+				((Configurable) mapreduceInputFormat).setConf(configuration);
+			}
 		}
 	}
 
@@ -169,21 +181,26 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void open(HadoopInputSplit split) throws IOException {
-		TaskAttemptContext context;
-		try {
-			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-		} catch(Exception e) {
-			throw new RuntimeException(e);
-		}
 
-		try {
-			this.recordReader = this.mapreduceInputFormat
-					.createRecordReader(split.getHadoopInputSplit(), context);
-			this.recordReader.initialize(split.getHadoopInputSplit(), context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordReader.", e);
-		} finally {
-			this.fetched = false;
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+
+			TaskAttemptContext context;
+			try {
+				context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			try {
+				this.recordReader = this.mapreduceInputFormat
+						.createRecordReader(split.getHadoopInputSplit(), context);
+				this.recordReader.initialize(split.getHadoopInputSplit(), context);
+			} catch (InterruptedException e) {
+				throw new IOException("Could not create RecordReader.", e);
+			} finally {
+				this.fetched = false;
+			}
 		}
 	}
 
@@ -207,7 +224,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void close() throws IOException {
-		this.recordReader.close();
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			this.recordReader.close();
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d656fc33/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 72c105b..dc475e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -49,6 +49,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	private static final long serialVersionUID = 1L;
 
+	// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	private org.apache.hadoop.conf.Configuration configuration;
 	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
 	protected transient RecordWriter<K,V> recordWriter;
@@ -73,8 +81,12 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	@Override
 	public void configure(Configuration parameters) {
-		if(this.mapreduceOutputFormat instanceof Configurable){
-			((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
+
+		// enforce sequential configure() calls
+		synchronized (CONFIGURE_MUTEX) {
+			if (this.mapreduceOutputFormat instanceof Configurable) {
+				((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
+			}
 		}
 	}
 
@@ -86,49 +98,53 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-
-		this.taskNumber = taskNumber+1;
 
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.output.basename", "tmp");
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+			if (Integer.toString(taskNumber + 1).length() > 6) {
+				throw new IOException("Task id too large.");
+			}
 
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
-				+ Integer.toString(taskNumber + 1)
-				+ "_0");
+			this.taskNumber = taskNumber + 1;
 
-		this.configuration.set("mapred.task.id", taskAttemptID.toString());
-		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
-
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-			this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
-			this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
+			// for hadoop 2.2
+			this.configuration.set("mapreduce.output.basename", "tmp");
 
-		this.context.getCredentials().addAll(this.credentials);
-		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-		if(currentUserCreds != null) {
-			this.context.getCredentials().addAll(currentUserCreds);
-		}
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+					+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+					+ Integer.toString(taskNumber + 1)
+					+ "_0");
 
-		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-		if(outputCommitter instanceof FileOutputCommitter) {
-			this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
-		}
-
-		try {
-			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordWriter.", e);
+			this.configuration.set("mapred.task.id", taskAttemptID.toString());
+			this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+			// for hadoop 2.2
+			this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+			this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+
+			try {
+				this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+				this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
+				this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			this.context.getCredentials().addAll(this.credentials);
+			Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+			if (currentUserCreds != null) {
+				this.context.getCredentials().addAll(currentUserCreds);
+			}
+
+			// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+			if (outputCommitter instanceof FileOutputCommitter) {
+				this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter) this.outputCommitter).getWorkPath().toString());
+			}
+
+			try {
+				this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+			} catch (InterruptedException e) {
+				throw new IOException("Could not create RecordWriter.", e);
+			}
 		}
 	}
 
@@ -138,27 +154,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void close() throws IOException {
-		try {
-			this.recordWriter.close(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not close RecordReader.", e);
-		}
-		
-		if (this.outputCommitter.needsTaskCommit(this.context)) {
-			this.outputCommitter.commitTask(this.context);
-		}
-		
-		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-		
-		// rename tmp-file to final name
-		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
-		
-		String taskNumberStr = Integer.toString(this.taskNumber);
-		String tmpFileTemplate = "tmp-r-00000";
-		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
-		
-		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
-			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			try {
+				this.recordWriter.close(this.context);
+			} catch (InterruptedException e) {
+				throw new IOException("Could not close RecordReader.", e);
+			}
+
+			if (this.outputCommitter.needsTaskCommit(this.context)) {
+				this.outputCommitter.commitTask(this.context);
+			}
+
+			Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+			// rename tmp-file to final name
+			FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+
+			String taskNumberStr = Integer.toString(this.taskNumber);
+			String tmpFileTemplate = "tmp-r-00000";
+			String tmpFile = tmpFileTemplate.substring(0, 11 - taskNumberStr.length()) + taskNumberStr;
+
+			if (fs.exists(new Path(outputPath.toString() + "/" + tmpFile))) {
+				fs.rename(new Path(outputPath.toString() + "/" + tmpFile), new Path(outputPath.toString() + "/" + taskNumberStr));
+			}
 		}
 	}