You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/08/26 14:25:39 UTC
flink git commit: [FLINK-2394] HadoopOutputFormats use correct
OutputCommitters.
Repository: flink
Updated Branches:
refs/heads/master bc6278716 -> 6e1de9889
[FLINK-2394] HadoopOutputFormats use correct OutputCommitters.
This closes #1056.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e1de988
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e1de988
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e1de988
Branch: refs/heads/master
Commit: 6e1de9889fb4c6fdf0cd6f4892bdbbafab4c1e48
Parents: bc62787
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 30 21:47:01 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 26 14:25:03 2015 +0200
----------------------------------------------------------------------
.../java/hadoop/mapred/HadoopOutputFormat.java | 6 +++
.../hadoop/mapred/HadoopOutputFormatBase.java | 16 ++++----
.../mapreduce/HadoopOutputFormatBase.java | 42 ++++++++++----------
.../hadoop/mapred/HadoopOutputFormat.scala | 11 ++++-
4 files changed, 45 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
index 44cfb57..0b7368c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCommitter;
public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
@@ -31,6 +32,11 @@ public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2
super(mapredOutputFormat, job);
}
+ public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, Class<OutputCommitter> outputCommitterClass, JobConf job) {
+ this(mapredOutputFormat, job);
+ super.getJobConf().setOutputCommitter(outputCommitterClass);
+ }
+
@Override
public void writeRecord(Tuple2<K, V> record) throws IOException {
this.recordWriter.write(record.f0, record.f1);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a5baa7e..a1bb906 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -26,11 +26,11 @@ import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
@@ -48,7 +48,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
private JobConf jobConf;
private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
- private transient FileOutputCommitter fileOutputCommitter;
+ private transient OutputCommitter outputCommitter;
private transient TaskAttemptContext context;
private transient JobContext jobContext;
@@ -106,7 +106,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
throw new RuntimeException(e);
}
- this.fileOutputCommitter = new FileOutputCommitter();
+ this.outputCommitter = this.jobConf.getOutputCommitter();
try {
this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
@@ -114,7 +114,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
throw new RuntimeException(e);
}
- this.fileOutputCommitter.setupJob(jobContext);
+ this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
@@ -127,8 +127,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
public void close() throws IOException {
this.recordWriter.close(new HadoopDummyReporter());
- if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
- this.fileOutputCommitter.commitTask(this.context);
+ if (this.outputCommitter.needsTaskCommit(this.context)) {
+ this.outputCommitter.commitTask(this.context);
}
}
@@ -137,10 +137,10 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
try {
JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
- FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+ OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
// finalize HDFS output format
- fileOutputCommitter.commitJob(jobContext);
+ outputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 14d5c81..74ed6d0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -45,7 +46,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
private org.apache.hadoop.conf.Configuration configuration;
private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
- private transient FileOutputCommitter fileOutputCommitter;
+ private transient OutputCommitter outputCommitter;
private transient TaskAttemptContext context;
private transient int taskNumber;
@@ -101,20 +102,16 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
try {
this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
-
- try {
- this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+ this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
+ this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
} catch (Exception e) {
throw new RuntimeException(e);
}
// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
- this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
+ if(outputCommitter instanceof FileOutputCommitter) {
+ this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
+ }
try {
this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
@@ -135,8 +132,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
throw new IOException("Could not close RecordReader.", e);
}
- if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
- this.fileOutputCommitter.commitTask(this.context);
+ if (this.outputCommitter.needsTaskCommit(this.context)) {
+ this.outputCommitter.commitTask(this.context);
}
Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
@@ -152,28 +149,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
}
}
-
+
@Override
public void finalizeGlobal(int parallelism) throws IOException {
JobContext jobContext;
TaskAttemptContext taskContext;
try {
-
- TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
- + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")
- + Integer.toString(1)
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")
+ + Integer.toString(1)
+ "_0");
-
+
jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+ this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
- this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
-
+
// finalize HDFS output format
- this.fileOutputCommitter.commitJob(jobContext);
+ if(this.outputCommitter != null) {
+ this.outputCommitter.commitJob(jobContext);
+ }
+
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e1de988/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
index 56b7a7f..68b4922 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -18,11 +18,20 @@
package org.apache.flink.api.scala.hadoop.mapred
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+ def this(
+ mapredOutputFormat: OutputFormat[K, V],
+ outputCommitterClass: Class[OutputCommitter],
+ job: JobConf) {
+
+ this(mapredOutputFormat, job)
+ this.getJobConf.setOutputCommitter(outputCommitterClass)
+ }
+
def writeRecord(record: (K, V)) {
this.recordWriter.write(record._1, record._2)
}