You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/27 23:34:14 UTC
flink git commit: [FLINK-1828] [hadoop] Fixed missing call to
configure() for Configurable HadoopOutputFormats.
Repository: flink
Updated Branches:
refs/heads/master 11f1dd645 -> de573cf5c
[FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats.
This closes #632
This closes #571
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de573cf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de573cf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de573cf5
Branch: refs/heads/master
Commit: de573cf5cef3bed6c489af85dba2cc61912db4c0
Parents: 11f1dd6
Author: fpompermaier <f....@gmail.com>
Authored: Mon Apr 27 16:38:51 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Apr 27 23:28:17 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java | 6 ++++--
.../api/java/hadoop/mapreduce/HadoopOutputFormatBase.java | 5 ++++-
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/de573cf5/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a59b96f..a6a318c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
@@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
private transient JobContext jobContext;
public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
- super();
this.mapredOutputFormat = mapredOutputFormat;
HadoopUtils.mergeHadoopConf(job);
this.jobConf = job;
@@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
@Override
public void configure(Configuration parameters) {
- // nothing to do
+ if(this.mapredOutputFormat instanceof Configurable){
+ ((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/de573cf5/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 10e631d..0e59257 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -65,7 +66,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
@Override
public void configure(Configuration parameters) {
- // nothing to do
+ if(this.mapreduceOutputFormat instanceof Configurable){
+ ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
+ }
}
/**