You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/08/26 14:25:39 UTC

flink git commit: [FLINK-2394] HadoopOutputFormats use correct OutputCommitters.

Repository: flink
Updated Branches:
  refs/heads/master bc6278716 -> 6e1de9889


[FLINK-2394] HadoopOutputFormats use correct OutputCommitters.

This closes #1056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e1de988
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e1de988
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e1de988

Branch: refs/heads/master
Commit: 6e1de9889fb4c6fdf0cd6f4892bdbbafab4c1e48
Parents: bc62787
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 30 21:47:01 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 26 14:25:03 2015 +0200

----------------------------------------------------------------------
 .../java/hadoop/mapred/HadoopOutputFormat.java  |  6 +++
 .../hadoop/mapred/HadoopOutputFormatBase.java   | 16 ++++----
 .../mapreduce/HadoopOutputFormatBase.java       | 42 ++++++++++----------
 .../hadoop/mapred/HadoopOutputFormat.scala      | 11 ++++-
 4 files changed, 45 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
index 44cfb57..0b7368c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCommitter;
 
 public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
 
@@ -31,6 +32,11 @@ public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2
 		super(mapredOutputFormat, job);
 	}
 
+	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, Class<OutputCommitter> outputCommitterClass, JobConf job) {
+		this(mapredOutputFormat, job);
+		super.getJobConf().setOutputCommitter(outputCommitterClass);
+	}
+
 	@Override
 	public void writeRecord(Tuple2<K, V> record) throws IOException {
 		this.recordWriter.write(record.f0, record.f1);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a5baa7e..a1bb906 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -26,11 +26,11 @@ import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
@@ -48,7 +48,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 	private JobConf jobConf;
 	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
 	protected transient RecordWriter<K,V> recordWriter;
-	private transient FileOutputCommitter fileOutputCommitter;
+	private transient OutputCommitter outputCommitter;
 	private transient TaskAttemptContext context;
 	private transient JobContext jobContext;
 
@@ -106,7 +106,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 			throw new RuntimeException(e);
 		}
 
-		this.fileOutputCommitter = new FileOutputCommitter();
+		this.outputCommitter = this.jobConf.getOutputCommitter();
 
 		try {
 			this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
@@ -114,7 +114,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 			throw new RuntimeException(e);
 		}
 
-		this.fileOutputCommitter.setupJob(jobContext);
+		this.outputCommitter.setupJob(jobContext);
 
 		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
 	}
@@ -127,8 +127,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 	public void close() throws IOException {
 		this.recordWriter.close(new HadoopDummyReporter());
 		
-		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-			this.fileOutputCommitter.commitTask(this.context);
+		if (this.outputCommitter.needsTaskCommit(this.context)) {
+			this.outputCommitter.commitTask(this.context);
 		}
 	}
 	
@@ -137,10 +137,10 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 
 		try {
 			JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-			FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+			OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
 			
 			// finalize HDFS output format
-			fileOutputCommitter.commitJob(jobContext);
+			outputCommitter.commitJob(jobContext);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 14d5c81..74ed6d0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -45,7 +46,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 	private org.apache.hadoop.conf.Configuration configuration;
 	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
 	protected transient RecordWriter<K,V> recordWriter;
-	private transient FileOutputCommitter fileOutputCommitter;
+	private transient OutputCommitter outputCommitter;
 	private transient TaskAttemptContext context;
 	private transient int taskNumber;
 
@@ -101,20 +102,16 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 
 		try {
 			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
-
-		try {
-			this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+			this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
+			this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
 
 		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-		this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
+		if(outputCommitter instanceof FileOutputCommitter) {
+			this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
+		}
 
 		try {
 			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
@@ -135,8 +132,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 			throw new IOException("Could not close RecordReader.", e);
 		}
 		
-		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-			this.fileOutputCommitter.commitTask(this.context);
+		if (this.outputCommitter.needsTaskCommit(this.context)) {
+			this.outputCommitter.commitTask(this.context);
 		}
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
@@ -152,28 +149,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
 			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
 		}
 	}
-	
+
 	@Override
 	public void finalizeGlobal(int parallelism) throws IOException {
 
 		JobContext jobContext;
 		TaskAttemptContext taskContext;
 		try {
-			
-			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
-					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
-					+ Integer.toString(1) 
+
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")
+					+ Integer.toString(1)
 					+ "_0");
-			
+
 			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
 			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+			this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(taskContext);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
-		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
-		
+
 		// finalize HDFS output format
-		this.fileOutputCommitter.commitJob(jobContext);
+		if(this.outputCommitter != null) {
+			this.outputCommitter.commitJob(jobContext);
+		}
+
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
index 56b7a7f..68b4922 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -18,11 +18,20 @@
 package org.apache.flink.api.scala.hadoop.mapred
 
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
 
 class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
   extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
 
+  def this(
+      mapredOutputFormat: OutputFormat[K, V],
+      outputCommitterClass: Class[OutputCommitter],
+      job: JobConf) {
+
+    this(mapredOutputFormat, job)
+    this.getJobConf.setOutputCommitter(outputCommitterClass)
+  }
+
   def writeRecord(record: (K, V)) {
     this.recordWriter.write(record._1, record._2)
   }