You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/28 16:22:28 UTC

[2/2] flink git commit: [FLINK-7134] Remove hadoop1.x code in mapreduce.utils.HadoopUtils

[FLINK-7134] Remove hadoop1.x code in mapreduce.utils.HadoopUtils

This closes #4362.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e367731
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e367731
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e367731

Branch: refs/heads/master
Commit: 8e367731019ee70e2f6dc1be21c80f51a2ef6a2b
Parents: 5725e63
Author: zhangminglei <zm...@163.com>
Authored: Thu Jul 20 20:29:48 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 28 18:20:01 2017 +0200

----------------------------------------------------------------------
 .../flink/hcatalog/HCatInputFormatBase.java     | 16 ++-----
 .../hadoop/mapreduce/HadoopInputFormatBase.java | 23 +++-------
 .../mapreduce/HadoopOutputFormatBase.java       | 10 +++--
 .../hadoop/mapreduce/utils/HadoopUtils.java     | 45 --------------------
 4 files changed, 15 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
index 26f2fed..d3ed558 100644
--- a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hive.hcatalog.common.HCatException;
 import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.hive.hcatalog.data.DefaultHCatRecord;
@@ -269,12 +271,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
 			throws IOException {
 		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
 
-		JobContext jobContext = null;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
+		JobContext jobContext = new JobContextImpl(configuration, new JobID());
 
 		List<InputSplit> splits;
 		try {
@@ -297,12 +294,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
 
 	@Override
 	public void open(HadoopInputSplit split) throws IOException {
-		TaskAttemptContext context = null;
-		try {
-			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
+		TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
 
 		try {
 			this.recordReader = this.hCatInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 576c76e..06205e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -115,12 +117,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 			return null;
 		}
 
-		JobContext jobContext;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, null);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
+		JobContext jobContext = new JobContextImpl(configuration, null);
 
 		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
 				(FileBaseStatistics) cachedStats : null;
@@ -149,12 +146,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 			throws IOException {
 		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
 
-		JobContext jobContext;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
+		JobContext jobContext = new JobContextImpl(configuration, new JobID());
 
 		jobContext.getCredentials().addAll(this.credentials);
 		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
@@ -187,12 +179,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 		// enforce sequential open() calls
 		synchronized (OPEN_MUTEX) {
 
-			TaskAttemptContext context;
-			try {
-				context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
+			TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
 
 			try {
 				this.recordReader = this.mapreduceInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 06af0ce..165455e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -125,9 +127,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 			this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
 
 			try {
-				this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+				this.context = new TaskAttemptContextImpl(this.configuration, taskAttemptID);
 				this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
-				this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+				this.outputCommitter.setupJob(new JobContextImpl(this.configuration, new JobID()));
 			} catch (Exception e) {
 				throw new RuntimeException(e);
 			}
@@ -196,8 +198,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 					+ Integer.toString(1)
 					+ "_0");
 
-			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
-			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+			jobContext = new JobContextImpl(this.configuration, new JobID());
+			taskContext = new TaskAttemptContextImpl(this.configuration, taskAttemptID);
 			this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(taskContext);
 		} catch (Exception e) {
 			throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/flink/blob/8e367731/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
index d6cb7f8..3dc0c77 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -22,12 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.GlobalConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-import java.lang.reflect.Constructor;
 import java.util.Map;
 
 /**
@@ -55,46 +50,6 @@ public final class HadoopUtils {
 		}
 	}
 
-	public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
-		try {
-			Class<?> clazz;
-			// for Hadoop 1.xx
-			if (JobContext.class.isInterface()) {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
-			JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
-
-			return context;
-		} catch (Exception e) {
-			throw new Exception("Could not create instance of JobContext.");
-		}
-	}
-
-	public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID taskAttemptID) throws Exception {
-		try {
-			Class<?> clazz;
-			// for Hadoop 1.xx
-			if (JobContext.class.isInterface()) {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
-			}
-			Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
-			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
-
-			return context;
-		} catch (Exception e) {
-			throw new Exception("Could not create instance of TaskAttemptContext.");
-		}
-	}
-
 	/**
 	 * Private constructor to prevent instantiation.
 	 */