You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 16:53:53 UTC

[2/2] flink git commit: [FLINK-7767] [file system sinks] Avoid loading Hadoop conf dynamically at runtime

[FLINK-7767] [file system sinks] Avoid loading Hadoop conf dynamically at runtime


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bad3df54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bad3df54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bad3df54

Branch: refs/heads/master
Commit: bad3df54d20677157f48c3ee1f3251d2c4bce8ba
Parents: 7843c2f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 5 11:26:13 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 6 17:43:20 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  21 ++--
 .../connectors/fs/SequenceFileWriter.java       |   3 +-
 .../connectors/fs/bucketing/BucketingSink.java  | 117 +++++++++++++++++--
 .../fs/bucketing/BucketingSinkTest.java         |   1 +
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  26 +----
 5 files changed, 116 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index e5758e8..9e54775 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -34,13 +33,17 @@ import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.time.StopWatch;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
@@ -279,6 +282,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * User-defined FileSystem parameters.
      */
+	@Nullable
 	private Configuration fsConfig;
 
 	/**
@@ -382,19 +386,10 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	 * @throws IOException
 	 */
 	private void initFileSystem() throws IOException {
-		if (fs != null) {
-			return;
+		if (fs == null) {
+			Path path = new Path(basePath);
+			fs = BucketingSink.createHadoopFileSystem(path, fsConfig);
 		}
-		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		if (fsConfig != null) {
-			String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
-			hadoopConf.setBoolean(disableCacheName, true);
-			for (String key : fsConfig.keySet()) {
-				hadoopConf.set(key, fsConfig.getString(key, null));
-			}
-		}
-
-		fs = new Path(basePath).getFileSystem(hadoopConf);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 901589f..6f80544 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 
 import org.apache.hadoop.conf.Configuration;
@@ -90,7 +89,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
 
 		CompressionCodec codec = null;
 
-		Configuration conf = HadoopFileSystem.getHadoopConfiguration();
+		Configuration conf = fs.getConf();
 
 		if (!compressionCodecName.equals("None")) {
 			CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cc924a4..55400c6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -42,17 +42,22 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.time.StopWatch;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -287,6 +292,7 @@ public class BucketingSink<T>
 	/**
 	 * User-defined FileSystem parameters.
 	 */
+	@Nullable
 	private Configuration fsConfig;
 
 	/**
@@ -402,19 +408,10 @@ public class BucketingSink<T>
 	 * @throws IOException
 	 */
 	private void initFileSystem() throws IOException {
-		if (fs != null) {
-			return;
+		if (fs == null) {
+			Path path = new Path(basePath);
+			fs = createHadoopFileSystem(path, fsConfig);
 		}
-		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		if (fsConfig != null) {
-			String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
-			hadoopConf.setBoolean(disableCacheName, true);
-			for (String key : fsConfig.keySet()) {
-				hadoopConf.set(key, fsConfig.getString(key, null));
-			}
-		}
-
-		fs = new Path(basePath).getFileSystem(hadoopConf);
 	}
 
 	@Override
@@ -1113,4 +1110,100 @@ public class BucketingSink<T>
 			this.lastWrittenToTime = lastWrittenToTime;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public static FileSystem createHadoopFileSystem(
+			Path path,
+			@Nullable Configuration extraUserConf) throws IOException {
+
+		// try to get the Hadoop File System via the Flink File Systems
+		// that way we get the proper configuration
+
+		final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
+		final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
+				((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
+
+		// fast path: if the Flink file system wraps Hadoop anyways and we need no extra config,
+		// then we use it directly
+		if (extraUserConf == null && hadoopFs != null) {
+			return hadoopFs;
+		}
+		else {
+			// we need to re-instantiate the Hadoop file system, because we either have
+			// a special config, or the Path gave us a Flink FS that is not backed by
+			// Hadoop (like file://)
+
+			final org.apache.hadoop.conf.Configuration hadoopConf;
+			if (hadoopFs != null) {
+				// have a Hadoop FS but need to apply extra config
+				hadoopConf = hadoopFs.getConf();
+			}
+			else {
+				// the Path gave us a Flink FS that is not backed by Hadoop (like file://)
+				// we need to get access to the Hadoop file system first
+
+				// we access the Hadoop FS in Flink, which carries the proper
+				// Hadoop configuration. we should get rid of this once the bucketing sink is
+				// properly implemented against Flink's FS abstraction
+
+				URI genericHdfsUri = URI.create("hdfs://localhost:12345/");
+				org.apache.flink.core.fs.FileSystem accessor =
+						org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(genericHdfsUri);
+
+				if (!(accessor instanceof HadoopFileSystem)) {
+					throw new IOException(
+							"Cannot instantiate a Hadoop file system to access the Hadoop configuration. " +
+							"FS for hdfs:// is " + accessor.getClass().getName());
+				}
+
+				hadoopConf = ((HadoopFileSystem) accessor).getHadoopFileSystem().getConf();
+			}
+
+			// finalize the configuration
+
+			final org.apache.hadoop.conf.Configuration finalConf;
+			if (extraUserConf == null) {
+				finalConf = hadoopConf;
+			}
+			else {
+				finalConf = new org.apache.hadoop.conf.Configuration(hadoopConf);
+
+				for (String key : extraUserConf.keySet()) {
+					finalConf.set(key, extraUserConf.getString(key, null));
+				}
+			}
+
+			// we explicitly re-instantiate the file system here in order to make sure
+			// that the configuration is applied.
+
+			URI fsUri = path.toUri();
+			final String scheme = fsUri.getScheme();
+			final String authority = fsUri.getAuthority();
+
+			if (scheme == null && authority == null) {
+				fsUri = FileSystem.getDefaultUri(finalConf);
+			}
+			else if (scheme != null && authority == null) {
+				URI defaultUri = FileSystem.getDefaultUri(finalConf);
+				if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
+					fsUri = defaultUri;
+				}
+			}
+
+			final Class<? extends FileSystem> fsClass = FileSystem.getFileSystemClass(fsUri.getScheme(), finalConf);
+			final FileSystem fs;
+			try {
+				fs = fsClass.newInstance();
+			}
+			catch (Exception e) {
+				throw new IOException("Cannot instantiate the Hadoop file system", e);
+			}
+
+			fs.initialize(fsUri, finalConf);
+			return fs;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index d6852ef..695b696 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;

http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index fd6b9da..4ebf4bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.fs.hdfs;
 
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.util.HadoopUtils;
 
 import java.io.IOException;
 import java.net.URI;
@@ -116,6 +114,7 @@ public final class HadoopFileSystem extends FileSystem {
 	}
 
 	@Override
+	@SuppressWarnings("deprecation")
 	public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
 			final short replication, final long blockSize) throws IOException {
 		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
@@ -169,27 +168,4 @@ public final class HadoopFileSystem extends FileSystem {
 	public boolean isDistributedFS() {
 		return true;
 	}
-
-	// ------------------------------------------------------------------------
-	//  Miscellaneous Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
-	 * in the main configuration (flink-conf.yaml).
-	 * This method is public because its being used in the HadoopDataSource.
-	 *
-	 * @deprecated This method should not be used, because it dynamically (and possibly incorrectly)
-	 *             re-loads the Flink configuration.
-	 *             Use {@link HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)}
-	 *             instead.
-	 */
-	@Deprecated
-	public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
-
-		org.apache.flink.configuration.Configuration flinkConfiguration =
-				GlobalConfiguration.loadConfiguration();
-
-		return HadoopUtils.getHadoopConfiguration(flinkConfiguration);
-	}
 }