You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:16:13 UTC
[flink] 05/06: [FLINK-10366] [s3] Consolidate shared classes for S3
in flink-s3-fs-base
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f113cf058c15b3fe70ac116e768519995a5c477
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 13 21:25:25 2018 +0200
[FLINK-10366] [s3] Consolidate shared classes for S3 in flink-s3-fs-base
Some classes were previously incorrectly in flink-hadoop-fs
---
flink-filesystems/flink-s3-fs-base/pom.xml | 6 ++++
.../fs/s3/common/AbstractS3FileSystemFactory.java} | 16 +++++----
.../flink/fs/s3/common/FlinkS3FileSystem.java | 39 ++++++++++++++++++++++
.../flink/fs/s3/common}/HadoopConfigLoader.java | 9 +++--
.../flink/fs/s3hadoop/S3FileSystemFactory.java | 6 ++--
.../flink/fs/s3hadoop/HadoopS3FileSystemTest.java | 2 +-
.../flink/fs/s3presto/S3FileSystemFactory.java | 6 ++--
.../flink/fs/s3presto/PrestoS3FileSystemTest.java | 8 ++---
8 files changed, 71 insertions(+), 21 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml
index c1e30ac..6cc877d 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -155,6 +155,12 @@ under the License.
<pattern>com.google</pattern>
<shadedPattern>org.apache.flink.fs.s3base.shaded.com.google</shadedPattern>
</relocation>
+
+ <!-- shade Flink's Hadoop FS adapter classes -->
+ <relocation>
+ <pattern>org.apache.flink.runtime.fs.hdfs</pattern>
+ <shadedPattern>org.apache.flink.fs.s3.common.hadoop</shadedPattern>
+ </relocation>
</relocations>
<filters>
<filter>
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
similarity index 83%
rename from flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 63919ba..ed24138 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.fs.hdfs;
+package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
@@ -29,15 +29,16 @@ import java.io.IOException;
import java.net.URI;
/** Base class for Hadoop file system factories. */
-public abstract class AbstractFileSystemFactory implements FileSystemFactory {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystemFactory.class);
+public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractS3FileSystemFactory.class);
/** Name of this factory for logging. */
private final String name;
private final HadoopConfigLoader hadoopConfigLoader;
- protected AbstractFileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
+ protected AbstractS3FileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
this.name = name;
this.hadoopConfigLoader = hadoopConfigLoader;
}
@@ -49,13 +50,14 @@ public abstract class AbstractFileSystemFactory implements FileSystemFactory {
@Override
public FileSystem create(URI fsUri) throws IOException {
- LOG.debug("Creating Hadoop file system (backed by " + name + ")");
- LOG.debug("Loading Hadoop configuration for " + name);
+ LOG.debug("Creating S3 file system backed by {}", name);
+ LOG.debug("Loading Hadoop configuration for {}", name);
+
try {
org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
- return new HadoopFileSystem(fs);
+ return new FlinkS3FileSystem(fs);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
new file mode 100644
index 0000000..a3d960a
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common;
+
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+/**
+ * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for S3.
+ * This class implements the common behavior implemented directly by Flink and delegates
+ * common calls to an implementation of Hadoop's filesystem abstraction.
+ */
+public class FlinkS3FileSystem extends HadoopFileSystem {
+
+ /**
+ * Wraps the given Hadoop S3 File System object as a Flink S3 File System object.
+ * The given Hadoop file system object is expected to be initialized already.
+ *
+ * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
+ */
+ public FlinkS3FileSystem(org.apache.hadoop.fs.FileSystem hadoopS3FileSystem) {
+ super(hadoopS3FileSystem);
+ }
+}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
similarity index 95%
rename from flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
index 82916f3..5ca497c 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.fs.hdfs;
+package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.Configuration;
@@ -27,8 +27,11 @@ import javax.annotation.Nonnull;
import java.util.Set;
-/** This class lazily loads hadoop configuration from resettable Flink's configuration. */
+/**
+ * This class lazily loads hadoop configuration from resettable Flink's configuration.
+ */
public class HadoopConfigLoader {
+
private static final Logger LOG = LoggerFactory.getLogger(HadoopConfigLoader.class);
/** The prefixes that Flink adds to the Hadoop fs config. */
@@ -80,7 +83,7 @@ public class HadoopConfigLoader {
}
else {
LOG.warn("Flink configuration is not set prior to loading this configuration."
- + " Using Hadoop configuration from the classpath.");
+ + " Cannot forward configuration keys from Flink configuration.");
hadoopConfig = new org.apache.hadoop.conf.Configuration();
}
}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 314d34c..2b46dbd 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -19,8 +19,8 @@
package org.apache.flink.fs.s3hadoop;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
@@ -33,7 +33,7 @@ import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index 647a937..6faf5b2 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.fs.s3hadoop;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.junit.Test;
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index d4ad561..4b9db97 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,8 +19,8 @@
package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -33,7 +33,7 @@ import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 4eeb2d4..e7fb8fe 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
@@ -95,9 +95,9 @@ public class PrestoS3FileSystemTest {
// ------------------------------------------------------------------------
private static void validateBasicCredentials(FileSystem fs) throws Exception {
- assertTrue(fs instanceof HadoopFileSystem);
+ assertTrue(fs instanceof FlinkS3FileSystem);
- org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
+ org.apache.hadoop.fs.FileSystem hadoopFs = ((FlinkS3FileSystem) fs).getHadoopFileSystem();
assertTrue(hadoopFs instanceof PrestoS3FileSystem);
try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {