You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/27 23:34:14 UTC

flink git commit: [FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats.

Repository: flink
Updated Branches:
  refs/heads/master 11f1dd645 -> de573cf5c


[FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats.

This closes #632
This closes #571


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de573cf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de573cf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de573cf5

Branch: refs/heads/master
Commit: de573cf5cef3bed6c489af85dba2cc61912db4c0
Parents: 11f1dd6
Author: fpompermaier <f....@gmail.com>
Authored: Mon Apr 27 16:38:51 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Apr 27 23:28:17 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java   | 6 ++++--
 .../api/java/hadoop/mapreduce/HadoopOutputFormatBase.java      | 5 ++++-
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de573cf5/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a59b96f..a6a318c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
 	private transient JobContext jobContext;
 
 	public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
-		super();
 		this.mapredOutputFormat = mapredOutputFormat;
 		HadoopUtils.mergeHadoopConf(job);
 		this.jobConf = job;
@@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
 
 	@Override
 	public void configure(Configuration parameters) {
-		// nothing to do
+		if(this.mapredOutputFormat instanceof Configurable){
+			((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/de573cf5/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 10e631d..0e59257 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -65,7 +66,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
 
 	@Override
 	public void configure(Configuration parameters) {
-		// nothing to do
+		if(this.mapreduceOutputFormat instanceof Configurable){
+			((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
+		}
 	}
 
 	/**