You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/12 11:54:07 UTC

flink git commit: [FLINK-2195] Configure Configurable Hadoop InputFormats

Repository: flink
Updated Branches:
  refs/heads/release-0.9 ce3bc9c0b -> f27fc819e


[FLINK-2195] Configure Configurable Hadoop InputFormats

This closes #828.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27fc819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27fc819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27fc819

Branch: refs/heads/release-0.9
Commit: f27fc819e1dcb616a59907926ad42e9b6ef4a325
Parents: ce3bc9c
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Jun 10 10:42:42 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jun 12 11:51:48 2015 +0200

----------------------------------------------------------------------
 .../hadoop/mapreduce/HadoopInputFormatBase.java     | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f27fc819/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 2a6c0f4..1236884 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.hadoop.mapreduce;
 
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -46,6 +46,8 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> {
 
 	private static final long serialVersionUID = 1L;
@@ -63,10 +65,10 @@ public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, H
 
 	public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
 		super();
-		this.mapreduceInputFormat = mapreduceInputFormat;
-		this.keyClass = key;
-		this.valueClass = value;
-		this.configuration = job.getConfiguration();
+		this.mapreduceInputFormat = checkNotNull(mapreduceInputFormat);
+		this.keyClass = checkNotNull(key);
+		this.valueClass = checkNotNull(value);
+		this.configuration = checkNotNull(job).getConfiguration();
 		HadoopUtils.mergeHadoopConf(configuration);
 	}
 
@@ -80,7 +82,9 @@ public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, H
 
 	@Override
 	public void configure(Configuration parameters) {
-		// nothing to do
+		if (mapreduceInputFormat instanceof Configurable) {
+			((Configurable) mapreduceInputFormat).setConf(configuration);
+		}
 	}
 
 	@Override