You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/28 16:22:28 UTC
[2/2] flink git commit: [FLINK-7134] Remove hadoop1.x code in
mapreduce.utils.HadoopUtils
[FLINK-7134] Remove hadoop1.x code in mapreduce.utils.HadoopUtils
This closes #4362.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e367731
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e367731
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e367731
Branch: refs/heads/master
Commit: 8e367731019ee70e2f6dc1be21c80f51a2ef6a2b
Parents: 5725e63
Author: zhangminglei <zm...@163.com>
Authored: Thu Jul 20 20:29:48 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 28 18:20:01 2017 +0200
----------------------------------------------------------------------
.../flink/hcatalog/HCatInputFormatBase.java | 16 ++-----
.../hadoop/mapreduce/HadoopInputFormatBase.java | 23 +++-------
.../mapreduce/HadoopOutputFormatBase.java | 10 +++--
.../hadoop/mapreduce/utils/HadoopUtils.java | 45 --------------------
4 files changed, 15 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
index 26f2fed..d3ed558 100644
--- a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
@@ -269,12 +271,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
throws IOException {
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
- JobContext jobContext = null;
- try {
- jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
List<InputSplit> splits;
try {
@@ -297,12 +294,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
@Override
public void open(HadoopInputSplit split) throws IOException {
- TaskAttemptContext context = null;
- try {
- context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
try {
this.recordReader = this.hCatInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 576c76e..06205e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -115,12 +117,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
return null;
}
- JobContext jobContext;
- try {
- jobContext = HadoopUtils.instantiateJobContext(configuration, null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ JobContext jobContext = new JobContextImpl(configuration, null);
final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
@@ -149,12 +146,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
throws IOException {
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
- JobContext jobContext;
- try {
- jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
jobContext.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
@@ -187,12 +179,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
- TaskAttemptContext context;
- try {
- context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
try {
this.recordReader = this.mapreduceInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 06af0ce..165455e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -125,9 +127,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
try {
- this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+ this.context = new TaskAttemptContextImpl(this.configuration, taskAttemptID);
this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
- this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+ this.outputCommitter.setupJob(new JobContextImpl(this.configuration, new JobID()));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -196,8 +198,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
+ Integer.toString(1)
+ "_0");
- jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
- taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+ jobContext = new JobContextImpl(this.configuration, new JobID());
+ taskContext = new TaskAttemptContextImpl(this.configuration, taskAttemptID);
this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
index d6cb7f8..3dc0c77 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -22,12 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import java.lang.reflect.Constructor;
import java.util.Map;
/**
@@ -55,46 +50,6 @@ public final class HadoopUtils {
}
}
- public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
- try {
- Class<?> clazz;
- // for Hadoop 1.xx
- if (JobContext.class.isInterface()) {
- clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
- }
- // for Hadoop 2.xx
- else {
- clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
- }
- Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
- JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
-
- return context;
- } catch (Exception e) {
- throw new Exception("Could not create instance of JobContext.");
- }
- }
-
- public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception {
- try {
- Class<?> clazz;
- // for Hadoop 1.xx
- if (JobContext.class.isInterface()) {
- clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
- }
- // for Hadoop 2.xx
- else {
- clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
- }
- Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
- TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
-
- return context;
- } catch (Exception e) {
- throw new Exception("Could not create instance of TaskAttemptContext.");
- }
- }
-
/**
* Private constructor to prevent instantiation.
*/