You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/12 11:54:07 UTC
flink git commit: [FLINK-2195] Configure Configurable Hadoop
InputFormats
Repository: flink
Updated Branches:
refs/heads/release-0.9 ce3bc9c0b -> f27fc819e
[FLINK-2195] Configure Configurable Hadoop InputFormats
This closes #828.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27fc819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27fc819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27fc819
Branch: refs/heads/release-0.9
Commit: f27fc819e1dcb616a59907926ad42e9b6ef4a325
Parents: ce3bc9c
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Jun 10 10:42:42 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jun 12 11:51:48 2015 +0200
----------------------------------------------------------------------
.../hadoop/mapreduce/HadoopInputFormatBase.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f27fc819/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 2a6c0f4..1236884 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.hadoop.mapreduce;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -46,6 +46,8 @@ import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> {
private static final long serialVersionUID = 1L;
@@ -63,10 +65,10 @@ public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, H
public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
super();
- this.mapreduceInputFormat = mapreduceInputFormat;
- this.keyClass = key;
- this.valueClass = value;
- this.configuration = job.getConfiguration();
+ this.mapreduceInputFormat = checkNotNull(mapreduceInputFormat);
+ this.keyClass = checkNotNull(key);
+ this.valueClass = checkNotNull(value);
+ this.configuration = checkNotNull(job).getConfiguration();
HadoopUtils.mergeHadoopConf(configuration);
}
@@ -80,7 +82,9 @@ public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, H
@Override
public void configure(Configuration parameters) {
- // nothing to do
+ if (mapreduceInputFormat instanceof Configurable) {
+ ((Configurable) mapreduceInputFormat).setConf(configuration);
+ }
}
@Override