You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/06 15:01:18 UTC

flink git commit: [FLINK-7109] [hadoop] Remove GlobalConfiguration.loadConfiguration from HadoopUtils#getHadoopConfiguration

Repository: flink
Updated Branches:
  refs/heads/master 6c05abe34 -> 10156c9fc


[FLINK-7109] [hadoop] Remove GlobalConfiguration.loadConfiguration from HadoopUtils#getHadoopConfiguration

The HadoopUtils#getHadoopConfiguration should not load the global configuration. Instead
we pass it in as parameter.

This closes #4265.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10156c9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10156c9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10156c9f

Branch: refs/heads/master
Commit: 10156c9fce46f1b2ac26332d5d51b7fd493c5c0c
Parents: 6c05abe
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 01:07:16 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 6 17:00:49 2017 +0200

----------------------------------------------------------------------
 .../api/java/hadoop/mapred/utils/HadoopUtils.java   | 16 ++++++++++------
 .../java/hadoop/mapreduce/utils/HadoopUtils.java    |  8 +++++++-
 .../flink/runtime/security/SecurityUtils.java       |  2 +-
 3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10156c9f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index da8244f..07746fe 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -54,7 +54,11 @@ public final class HadoopUtils {
 	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
 	 */
 	public static void mergeHadoopConf(JobConf jobConf) {
-		org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration();
+		// we have to load the global configuration here, because the HadoopInputFormatBase does not
+		// have access to a Flink configuration object
+		org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+
+		Configuration hadoopConf = getHadoopConfiguration(flinkConfiguration);
 		for (Map.Entry<String, String> e : hadoopConf) {
 			if (jobConf.get(e.getKey()) == null) {
 				jobConf.set(e.getKey(), e.getValue());
@@ -109,13 +113,13 @@ public final class HadoopUtils {
 	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
 	 * in the main configuration (flink-conf.yaml).
 	 * This method is public because its being used in the HadoopDataSource.
+	 *
+	 * @param flinkConfiguration Flink configuration object
+	 * @return A Hadoop configuration instance
 	 */
-	public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
-
-		org.apache.flink.configuration.Configuration flinkConfiguration =
-			GlobalConfiguration.loadConfiguration();
+	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
 
-		Configuration retConf = new org.apache.hadoop.conf.Configuration();
+		Configuration retConf = new Configuration();
 
 		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
 		// the hdfs configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/10156c9f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
index 52fd734..cf0d057 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Constructor;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -39,8 +41,12 @@ public final class HadoopUtils {
 	 */
 	public static void mergeHadoopConf(Configuration hadoopConfig) {
 
+		// we have to load the global configuration here, because the HadoopInputFormatBase does not
+		// have access to a Flink configuration object
+		org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+
 		Configuration hadoopConf =
-			org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
+			org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(flinkConfiguration);
 
 		for (Map.Entry<String, String> e : hadoopConf) {
 			if (hadoopConfig.get(e.getKey()) == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/10156c9f/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index df49822..9e6f402 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -146,7 +146,7 @@ public class SecurityUtils {
 		 * @param flinkConf the Flink global configuration.
          */
 		public SecurityConfiguration(Configuration flinkConf) {
-			this(flinkConf, HadoopUtils.getHadoopConfiguration());
+			this(flinkConf, HadoopUtils.getHadoopConfiguration(flinkConf));
 		}
 
 		/**