You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:16:13 UTC

[flink] 05/06: [FLINK-10366] [s3] Consolidate shared classes for S3 in flink-s3-fs-base

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f113cf058c15b3fe70ac116e768519995a5c477
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 13 21:25:25 2018 +0200

    [FLINK-10366] [s3] Consolidate shared classes for S3 in flink-s3-fs-base
    
    Some classes were previously incorrectly in flink-hadoop-fs
---
 flink-filesystems/flink-s3-fs-base/pom.xml         |  6 ++++
 .../fs/s3/common/AbstractS3FileSystemFactory.java} | 16 +++++----
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 39 ++++++++++++++++++++++
 .../flink/fs/s3/common}/HadoopConfigLoader.java    |  9 +++--
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |  6 ++--
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  |  2 +-
 .../flink/fs/s3presto/S3FileSystemFactory.java     |  6 ++--
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |  8 ++---
 8 files changed, 71 insertions(+), 21 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml
index c1e30ac..6cc877d 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -155,6 +155,12 @@ under the License.
 									<pattern>com.google</pattern>
 									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.google</shadedPattern>
 								</relocation>
+
+								<!-- shade Flink's Hadoop FS adapter classes  -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.fs.hdfs</pattern>
+									<shadedPattern>org.apache.flink.fs.s3.common.hadoop</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
similarity index 83%
rename from flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 63919ba..ed24138 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.fs.hdfs;
+package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
@@ -29,15 +29,16 @@ import java.io.IOException;
 import java.net.URI;
 
 /** Base class for Hadoop file system factories. */
-public abstract class AbstractFileSystemFactory implements FileSystemFactory {
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystemFactory.class);
+public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractS3FileSystemFactory.class);
 
 	/** Name of this factory for logging. */
 	private final String name;
 
 	private final HadoopConfigLoader hadoopConfigLoader;
 
-	protected AbstractFileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
+	protected AbstractS3FileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
 		this.name = name;
 		this.hadoopConfigLoader = hadoopConfigLoader;
 	}
@@ -49,13 +50,14 @@ public abstract class AbstractFileSystemFactory implements FileSystemFactory {
 
 	@Override
 	public FileSystem create(URI fsUri) throws IOException {
-		LOG.debug("Creating Hadoop file system (backed by " + name + ")");
-		LOG.debug("Loading Hadoop configuration for " + name);
+		LOG.debug("Creating S3 file system backed by {}", name);
+		LOG.debug("Loading Hadoop configuration for {}", name);
+
 		try {
 			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
 			org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
 			fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
-			return new HadoopFileSystem(fs);
+			return new FlinkS3FileSystem(fs);
 		} catch (IOException ioe) {
 			throw ioe;
 		} catch (Exception e) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
new file mode 100644
index 0000000..a3d960a
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common;
+
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+/**
+ * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for S3.
+ * This class implements the common behavior implemented directly by Flink and delegates
+ * common calls to an implementation of Hadoop's filesystem abstraction.
+ */
+public class FlinkS3FileSystem extends HadoopFileSystem {
+
+	/**
+	 * Wraps the given Hadoop S3 File System object as a Flink S3 File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public FlinkS3FileSystem(org.apache.hadoop.fs.FileSystem hadoopS3FileSystem) {
+		super(hadoopS3FileSystem);
+	}
+}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
similarity index 95%
rename from flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
index 82916f3..5ca497c 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.fs.hdfs;
+package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
 
@@ -27,8 +27,11 @@ import javax.annotation.Nonnull;
 
 import java.util.Set;
 
-/** This class lazily loads hadoop configuration from resettable Flink's configuration. */
+/**
+ * This class lazily loads hadoop configuration from resettable Flink's configuration.
+ */
 public class HadoopConfigLoader {
+
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopConfigLoader.class);
 
 	/** The prefixes that Flink adds to the Hadoop fs config. */
@@ -80,7 +83,7 @@ public class HadoopConfigLoader {
 			}
 			else {
 				LOG.warn("Flink configuration is not set prior to loading this configuration."
-					+ " Using Hadoop configuration from the classpath.");
+					+ " Cannot forward configuration keys from Flink configuration.");
 				hadoopConfig = new org.apache.hadoop.conf.Configuration();
 			}
 		}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 314d34c..2b46dbd 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -19,8 +19,8 @@
 package org.apache.flink.fs.s3hadoop;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.slf4j.Logger;
@@ -33,7 +33,7 @@ import java.util.Set;
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
 	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
 
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index 647a937..6faf5b2 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3hadoop;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 
 import org.junit.Test;
 
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index d4ad561..4b9db97 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,8 +19,8 @@
 package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -33,7 +33,7 @@ import java.util.Set;
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
 
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
 
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 4eeb2d4..e7fb8fe 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
@@ -95,9 +95,9 @@ public class PrestoS3FileSystemTest {
 	// ------------------------------------------------------------------------
 
 	private static void validateBasicCredentials(FileSystem fs) throws Exception {
-		assertTrue(fs instanceof HadoopFileSystem);
+		assertTrue(fs instanceof FlinkS3FileSystem);
 
-		org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
+		org.apache.hadoop.fs.FileSystem hadoopFs = ((FlinkS3FileSystem) fs).getHadoopFileSystem();
 		assertTrue(hadoopFs instanceof PrestoS3FileSystem);
 
 		try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {