You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 16:53:53 UTC
[2/2] flink git commit: [FLINK-7767] [file system sinks] Avoid
loading Hadoop conf dynamically at runtime
[FLINK-7767] [file system sinks] Avoid loading Hadoop conf dynamically at runtime
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bad3df54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bad3df54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bad3df54
Branch: refs/heads/master
Commit: bad3df54d20677157f48c3ee1f3251d2c4bce8ba
Parents: 7843c2f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 5 11:26:13 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 6 17:43:20 2017 +0200
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 21 ++--
.../connectors/fs/SequenceFileWriter.java | 3 +-
.../connectors/fs/bucketing/BucketingSink.java | 117 +++++++++++++++++--
.../fs/bucketing/BucketingSinkTest.java | 1 +
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 26 +----
5 files changed, 116 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index e5758e8..9e54775 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -34,13 +33,17 @@ import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
@@ -279,6 +282,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* User-defined FileSystem parameters.
*/
+ @Nullable
private Configuration fsConfig;
/**
@@ -382,19 +386,10 @@ public class RollingSink<T> extends RichSinkFunction<T>
* @throws IOException
*/
private void initFileSystem() throws IOException {
- if (fs != null) {
- return;
+ if (fs == null) {
+ Path path = new Path(basePath);
+ fs = BucketingSink.createHadoopFileSystem(path, fsConfig);
}
- org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
- if (fsConfig != null) {
- String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
- hadoopConf.setBoolean(disableCacheName, true);
- for (String key : fsConfig.keySet()) {
- hadoopConf.set(key, fsConfig.getString(key, null));
- }
- }
-
- fs = new Path(basePath).getFileSystem(hadoopConf);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 901589f..6f80544 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.hadoop.conf.Configuration;
@@ -90,7 +89,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
CompressionCodec codec = null;
- Configuration conf = HadoopFileSystem.getHadoopConfiguration();
+ Configuration conf = fs.getConf();
if (!compressionCodecName.equals("None")) {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cc924a4..55400c6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -42,17 +42,22 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -287,6 +292,7 @@ public class BucketingSink<T>
/**
* User-defined FileSystem parameters.
*/
+ @Nullable
private Configuration fsConfig;
/**
@@ -402,19 +408,10 @@ public class BucketingSink<T>
* @throws IOException
*/
private void initFileSystem() throws IOException {
- if (fs != null) {
- return;
+ if (fs == null) {
+ Path path = new Path(basePath);
+ fs = createHadoopFileSystem(path, fsConfig);
}
- org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
- if (fsConfig != null) {
- String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
- hadoopConf.setBoolean(disableCacheName, true);
- for (String key : fsConfig.keySet()) {
- hadoopConf.set(key, fsConfig.getString(key, null));
- }
- }
-
- fs = new Path(basePath).getFileSystem(hadoopConf);
}
@Override
@@ -1113,4 +1110,100 @@ public class BucketingSink<T>
this.lastWrittenToTime = lastWrittenToTime;
}
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static FileSystem createHadoopFileSystem(
+ Path path,
+ @Nullable Configuration extraUserConf) throws IOException {
+
+ // try to get the Hadoop File System via the Flink File Systems
+ // that way we get the proper configuration
+
+ final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
+ final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
+ ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
+
+ // fast path: if the Flink file system wraps Hadoop anyways and we need no extra config,
+ // then we use it directly
+ if (extraUserConf == null && hadoopFs != null) {
+ return hadoopFs;
+ }
+ else {
+ // we need to re-instantiate the Hadoop file system, because we either have
+ // a special config, or the Path gave us a Flink FS that is not backed by
+ // Hadoop (like file://)
+
+ final org.apache.hadoop.conf.Configuration hadoopConf;
+ if (hadoopFs != null) {
+ // have a Hadoop FS but need to apply extra config
+ hadoopConf = hadoopFs.getConf();
+ }
+ else {
+ // the Path gave us a Flink FS that is not backed by Hadoop (like file://)
+ // we need to get access to the Hadoop file system first
+
+ // we access the Hadoop FS in Flink, which carries the proper
+ // Hadoop configuration. we should get rid of this once the bucketing sink is
+ // properly implemented against Flink's FS abstraction
+
+ URI genericHdfsUri = URI.create("hdfs://localhost:12345/");
+ org.apache.flink.core.fs.FileSystem accessor =
+ org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(genericHdfsUri);
+
+ if (!(accessor instanceof HadoopFileSystem)) {
+ throw new IOException(
+ "Cannot instantiate a Hadoop file system to access the Hadoop configuration. " +
+ "FS for hdfs:// is " + accessor.getClass().getName());
+ }
+
+ hadoopConf = ((HadoopFileSystem) accessor).getHadoopFileSystem().getConf();
+ }
+
+ // finalize the configuration
+
+ final org.apache.hadoop.conf.Configuration finalConf;
+ if (extraUserConf == null) {
+ finalConf = hadoopConf;
+ }
+ else {
+ finalConf = new org.apache.hadoop.conf.Configuration(hadoopConf);
+
+ for (String key : extraUserConf.keySet()) {
+ finalConf.set(key, extraUserConf.getString(key, null));
+ }
+ }
+
+ // we explicitly re-instantiate the file system here in order to make sure
+ // that the configuration is applied.
+
+ URI fsUri = path.toUri();
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
+
+ if (scheme == null && authority == null) {
+ fsUri = FileSystem.getDefaultUri(finalConf);
+ }
+ else if (scheme != null && authority == null) {
+ URI defaultUri = FileSystem.getDefaultUri(finalConf);
+ if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
+ fsUri = defaultUri;
+ }
+ }
+
+ final Class<? extends FileSystem> fsClass = FileSystem.getFileSystemClass(fsUri.getScheme(), finalConf);
+ final FileSystem fs;
+ try {
+ fs = fsClass.newInstance();
+ }
+ catch (Exception e) {
+ throw new IOException("Cannot instantiate the Hadoop file system", e);
+ }
+
+ fs.initialize(fsUri, finalConf);
+ return fs;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index d6852ef..695b696 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
http://git-wip-us.apache.org/repos/asf/flink/blob/bad3df54/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index fd6b9da..4ebf4bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -18,12 +18,10 @@
package org.apache.flink.runtime.fs.hdfs;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.util.HadoopUtils;
import java.io.IOException;
import java.net.URI;
@@ -116,6 +114,7 @@ public final class HadoopFileSystem extends FileSystem {
}
@Override
+ @SuppressWarnings("deprecation")
public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
@@ -169,27 +168,4 @@ public final class HadoopFileSystem extends FileSystem {
public boolean isDistributedFS() {
return true;
}
-
- // ------------------------------------------------------------------------
- // Miscellaneous Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
- * in the main configuration (flink-conf.yaml).
- * This method is public because its being used in the HadoopDataSource.
- *
- * @deprecated This method should not be used, because it dynamically (and possibly incorrectly)
- * re-loads the Flink configuration.
- * Use {@link HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)}
- * instead.
- */
- @Deprecated
- public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
-
- org.apache.flink.configuration.Configuration flinkConfiguration =
- GlobalConfiguration.loadConfiguration();
-
- return HadoopUtils.getHadoopConfiguration(flinkConfiguration);
- }
}