You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/28 15:27:23 UTC
[flink] 02/04: [FLINK-12115][fs] Add support for AzureFS
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e00ec88601583d370e14d7d969b20ab1cbc6ce3e
Author: Piyush Narang <p....@criteo.com>
AuthorDate: Thu Apr 4 16:00:46 2019 -0400
[FLINK-12115][fs] Add support for AzureFS
Check for http enabled storage accounts in AzureFS IT tests
Add AzureFS standalone E2E test
---
docs/ops/filesystems/azure.md | 77 ++++++++
docs/ops/filesystems/azure.zh.md | 77 ++++++++
docs/ops/filesystems/index.md | 9 +-
docs/ops/filesystems/index.zh.md | 7 +-
flink-dist/src/main/assemblies/opt.xml | 7 +
.../test-scripts/test_azure_fs.sh | 83 ++++++++
.../pom.xml | 78 +++++---
.../flink/fs/azurefs/AbstractAzureFSFactory.java | 85 ++++++++
.../apache/flink/fs/azurefs/AzureFSFactory.java | 30 +++
.../flink/fs/azurefs/SecureAzureFSFactory.java | 30 +++
.../org.apache.flink.core.fs.FileSystemFactory | 17 ++
.../flink/fs/azurefs/AzureFSFactoryTest.java | 94 +++++++++
.../fs/azurefs/AzureFileSystemBehaviorITCase.java | 220 +++++++++++++++++++++
.../src/test/resources/log4j-test.properties | 27 +++
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 4 +-
.../flink/runtime/util}/HadoopConfigLoader.java | 2 +-
flink-filesystems/flink-s3-fs-base/pom.xml | 2 +-
.../fs/s3/common/AbstractS3FileSystemFactory.java | 1 +
.../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 1 +
flink-filesystems/flink-s3-fs-hadoop/pom.xml | 5 +
.../flink/fs/s3hadoop/S3FileSystemFactory.java | 2 +-
.../flink/fs/s3hadoop/HadoopS3FileSystemTest.java | 2 +-
flink-filesystems/flink-s3-fs-presto/pom.xml | 6 +
.../flink/fs/s3presto/S3FileSystemFactory.java | 2 +-
.../flink/fs/s3presto/PrestoS3FileSystemTest.java | 2 +-
flink-filesystems/pom.xml | 1 +
26 files changed, 836 insertions(+), 35 deletions(-)
diff --git a/docs/ops/filesystems/azure.md b/docs/ops/filesystems/azure.md
new file mode 100644
index 0000000..36720c8
--- /dev/null
+++ b/docs/ops/filesystems/azure.md
@@ -0,0 +1,77 @@
+---
+title: "Azure Blob Storage"
+nav-title: Azure Blob Storage
+nav-parent_id: filesystems
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
+You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)
+
+* This will be replaced by the TOC
+{:toc}
+
+You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
+
+{% highlight plain %}
+wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+
+// SSL encrypted access
+wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+{% endhighlight %}
+
+Below shows how to use Azure Blob Storage with Flink:
+
+{% highlight java %}
+// Read from Azure Blob storage
+env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
+
+// Write to Azure Blob storage
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+
+// Use Azure Blob Storage as FsStatebackend
+env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
+{% endhighlight %}
+
+### Shaded Hadoop Azure Blob Storage file system
+
+To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
+
+#### Configurations setup
+After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
+
+To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
+
+There are some required configurations that must be added to `flink-conf.yaml`:
+
+{% highlight yaml %}
+fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
+{% endhighlight %}
+
+{% top %}
diff --git a/docs/ops/filesystems/azure.zh.md b/docs/ops/filesystems/azure.zh.md
new file mode 100644
index 0000000..36720c8
--- /dev/null
+++ b/docs/ops/filesystems/azure.zh.md
@@ -0,0 +1,77 @@
+---
+title: "Azure Blob Storage"
+nav-title: Azure Blob Storage
+nav-parent_id: filesystems
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
+You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)
+
+* This will be replaced by the TOC
+{:toc}
+
+You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
+
+{% highlight plain %}
+wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+
+// SSL encrypted access
+wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+{% endhighlight %}
+
+Below shows how to use Azure Blob Storage with Flink:
+
+{% highlight java %}
+// Read from Azure Blob storage
+env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
+
+// Write to Azure Blob storage
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+
+// Use Azure Blob Storage as FsStatebackend
+env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
+{% endhighlight %}
+
+### Shaded Hadoop Azure Blob Storage file system
+
+To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
+
+#### Configurations setup
+After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
+
+To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
+
+There are some required configurations that must be added to `flink-conf.yaml`:
+
+{% highlight yaml %}
+fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
+{% endhighlight %}
+
+{% top %}
diff --git a/docs/ops/filesystems/index.md b/docs/ops/filesystems/index.md
index 0d4a1be..eb4087d 100644
--- a/docs/ops/filesystems/index.md
+++ b/docs/ops/filesystems/index.md
@@ -25,7 +25,7 @@ under the License.
-->
Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
-These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
+These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
The file system used for a particular file is determined by its URI scheme.
For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
@@ -43,12 +43,17 @@ Flink ships with implementations for the following file systems:
- **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint.
- - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
+ - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
- **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*.
The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+
+ - **Azure Blob Storage**:
+ Flink directly provides a file system to work with Azure Blob Storage.
+ This filesystem is registered under the scheme *"wasb(s)://"*.
+ The implementation is self-contained with no dependency footprint.
## HDFS and Hadoop File System support
diff --git a/docs/ops/filesystems/index.zh.md b/docs/ops/filesystems/index.zh.md
index 0d4a1be..414c82f 100644
--- a/docs/ops/filesystems/index.zh.md
+++ b/docs/ops/filesystems/index.zh.md
@@ -25,7 +25,7 @@ under the License.
-->
Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
-These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
+These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
The file system used for a particular file is determined by its URI scheme.
For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
@@ -49,6 +49,11 @@ Flink ships with implementations for the following file systems:
The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+
+ - **Azure Blob Storage**:
+ Flink directly provides a file system to work with Azure Blob Storage.
+ This filesystem is registered under the scheme *"wasb(s)://"*.
+ The implementation is self-contained with no dependency footprint.
## HDFS and Hadoop File System support
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 1ce6e88..e28acd8 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -154,6 +154,13 @@
<fileMode>0644</fileMode>
</file>
+ <file>
+ <source>../flink-filesystems/flink-azure-fs-hadoop/target/flink-azure-fs-hadoop-${project.version}.jar</source>
+ <outputDirectory>opt/</outputDirectory>
+ <destName>flink-azure-fs-hadoop-${project.version}.jar</destName>
+ <fileMode>0644</fileMode>
+ </file>
+
<!-- Queryable State -->
<file>
<source>../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar</source>
diff --git a/flink-end-to-end-tests/test-scripts/test_azure_fs.sh b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh
new file mode 100755
index 0000000..40c6962
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tests for Azure file system.
+
+# To run single test, export IT_CASE_AZURE_ACCOUNT, IT_CASE_AZURE_ACCESS_KEY, IT_CASE_AZURE_CONTAINER to
+# the appropriate values and run:
+# flink-end-to-end-tests/run-single-test.sh skip flink-end-to-end-tests/test-scripts/test_azure_fs.sh
+
+source "$(dirname "$0")"/common.sh
+
+if [[ -z "$IT_CASE_AZURE_ACCOUNT" ]]; then
+ echo "Did not find Azure storage account environment variable, NOT running the e2e test."
+ exit 0
+else
+ echo "Found Azure storage account $IT_CASE_AZURE_ACCOUNT, running the e2e test."
+fi
+
+if [[ -z "$IT_CASE_AZURE_ACCESS_KEY" ]]; then
+ echo "Did not find Azure storage access key environment variable, NOT running the e2e test."
+ exit 0
+else
+ echo "Found Azure storage access key $IT_CASE_AZURE_ACCESS_KEY, running the e2e test."
+fi
+
+if [[ -z "$IT_CASE_AZURE_CONTAINER" ]]; then
+ echo "Did not find Azure storage container environment variable, NOT running the e2e test."
+ exit 0
+else
+ echo "Found Azure storage container $IT_CASE_AZURE_CONTAINER, running the e2e test."
+fi
+
+AZURE_TEST_DATA_WORDS_URI="wasbs://$IT_CASE_AZURE_CONTAINER@$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net/words"
+
+###################################
+# Setup Flink Azure access.
+#
+# Globals:
+# FLINK_DIR
+# IT_CASE_AZURE_ACCOUNT
+# IT_CASE_AZURE_ACCESS_KEY
+# Returns:
+# None
+###################################
+function azure_setup {
+ # make sure we delete the file at the end
+ function azure_cleanup {
+ rm $FLINK_DIR/lib/flink-azure-fs*.jar
+
+ # remove any leftover settings
+ sed -i -e 's/fs.azure.account.key.*//' "$FLINK_DIR/conf/flink-conf.yaml"
+ }
+ trap azure_cleanup EXIT
+
+ echo "Copying flink azure jars and writing out configs"
+ cp $FLINK_DIR/opt/flink-azure-fs-hadoop-*.jar $FLINK_DIR/lib/
+ echo "fs.azure.account.key.$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net: $IT_CASE_AZURE_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+}
+
+azure_setup
+
+echo "Starting Flink cluster.."
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input $AZURE_TEST_DATA_WORDS_URI --output $TEST_DATA_DIR/out/wc_out
+
+check_result_hash "WordCountWithAzureFS" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
similarity index 57%
copy from flink-filesystems/flink-s3-fs-hadoop/pom.xml
copy to flink-filesystems/flink-azure-fs-hadoop/pom.xml
index 9a5a80c..37567ce 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
@@ -27,14 +27,20 @@ under the License.
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-s3-fs-hadoop</artifactId>
- <name>flink-s3-fs-hadoop</name>
+ <artifactId>flink-azure-fs-hadoop</artifactId>
+ <name>flink-azure-fs-hadoop</name>
<packaging>jar</packaging>
+ <!-- need to use a release which includes this patch: https://github.com/apache/hadoop/commit/02cadbd24bf69925078d044701741e2e3fcb4b2f -->
+ <properties>
+ <fs.azure.version>2.7.0</fs.azure.version>
+ <fs.azure.sdk.version>1.16.0</fs.azure.sdk.version>
+ <fs.jackson.core.version>2.9.4</fs.jackson.core.version>
+ </properties>
+
<dependencies>
- <!-- Flink's file system abstraction (compiled against, not bundled) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
@@ -42,42 +48,59 @@ under the License.
<scope>provided</scope>
</dependency>
- <!-- S3 base -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-s3-fs-base</artifactId>
+ <artifactId>flink-hadoop-fs</artifactId>
<version>${project.version}</version>
</dependency>
- <!-- for the behavior test suite -->
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ <version>${fs.azure.version}</version>
</dependency>
- <!-- for the settings that make unshaded tests run -->
+ <!-- for the Azure HDFS related tests -->
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-fs-hadoop-shaded</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <!-- for Azure IT tests to check if HTTP endpoints are enabled / not -->
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure</artifactId>
+ <version>${fs.azure.sdk.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${fs.jackson.core.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
<scope>test</scope>
- <type>test-jar</type>
</dependency>
+ <!-- for the behavior test suite -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-fs</artifactId>
+ <artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
+
</dependencies>
<build>
<plugins>
+ <!-- Relocate all Azure related classes -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
@@ -96,22 +119,30 @@ under the License.
</includes>
</artifactSet>
<relocations>
- <!-- relocate the references to Hadoop to match the shaded Hadoop config -->
<relocation>
<pattern>org.apache.hadoop</pattern>
- <shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
+ <shadedPattern>org.apache.flink.fs.shaded.hadoop.org.apache.hadoop</shadedPattern>
</relocation>
- <!-- relocate the AWS dependencies -->
+ <!-- relocate the azure-storage dependencies -->
<relocation>
- <pattern>com.amazon</pattern>
- <shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
+ <pattern>com.microsoft.azure.storage</pattern>
+ <shadedPattern>org.apache.flink.fs.shaded.com.microsoft.azure.storage</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*</artifact>
<excludes>
- <exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
+ <exclude>properties.dtd</exclude>
+ <exclude>PropertyList-1.0.dtd</exclude>
+ <exclude>mozilla/**</exclude>
+ <exclude>META-INF/maven/**</exclude>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>core-default.xml</exclude>
+ <exclude>hdfs-default.xml</exclude>
</excludes>
</filter>
</filters>
@@ -121,5 +152,4 @@ under the License.
</plugin>
</plugins>
</build>
-
</project>
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
new file mode 100644
index 0000000..7ae9df8
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs). Based on Azure HDFS support in the
+ * <a href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class);
+
+ private static final String[] FLINK_CONFIG_PREFIXES = { "fs.azure.", "azure." };
+ private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";
+
+ private static final String[][] MIRRORED_CONFIG_KEYS = {};
+ private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.emptySet();
+ private static final Set<String> CONFIG_KEYS_TO_SHADE = Collections.emptySet();
+ private static final String FLINK_SHADING_PREFIX = "";
+
+ private final HadoopConfigLoader configLoader;
+
+ private Configuration flinkConfig;
+
+ public AbstractAzureFSFactory() {
+ this.configLoader = new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+ HADOOP_CONFIG_PREFIX, PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ configLoader.setFlinkConfig(config);
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ checkNotNull(fsUri, "passed file system URI object should not be null");
+ LOG.info("Trying to load and instantiate Azure File System");
+ return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
+ }
+
+ // uri is of the form: wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDir
+ private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException {
+ org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+
+ org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
+ azureFS.initialize(fsUri, hadoopConfig);
+
+ return azureFS;
+ }
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java
new file mode 100644
index 0000000..5f6246d
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+/**
+ * A factory for the Azure file system over HTTP.
+ */
+public class AzureFSFactory extends AbstractAzureFSFactory {
+
+ @Override
+ public String getScheme() {
+ return "wasb";
+ }
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java
new file mode 100644
index 0000000..7130a87
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+/**
+ * A factory for the Azure file system over HTTPs.
+ */
+public class SecureAzureFSFactory extends AbstractAzureFSFactory {
+
+ @Override
+ public String getScheme() {
+ return "wasbs";
+ }
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..4d6a19a
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.fs.azurefs.AzureFSFactory
+org.apache.flink.fs.azurefs.SecureAzureFSFactory
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
new file mode 100644
index 0000000..01b79b5
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.azure.AzureException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests for the AzureFSFactory.
+ */
+@RunWith(Parameterized.class)
+public class AzureFSFactoryTest extends TestLogger {
+
+ @Parameterized.Parameter
+ public String scheme;
+
+ @Parameterized.Parameters(name = "Scheme = {0}")
+ public static List<String> parameters() {
+ return Arrays.asList("wasb", "wasbs");
+ }
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private AbstractAzureFSFactory getFactory(String scheme) {
+ return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory();
+ }
+
+ @Test
+ public void testNullFsURI() throws Exception {
+ URI uri = null;
+ AbstractAzureFSFactory factory = getFactory(scheme);
+
+ exception.expect(NullPointerException.class);
+ exception.expectMessage("passed file system URI object should not be null");
+
+ factory.create(uri);
+ }
+
+ // missing credentials
+ @Test
+ public void testCreateFsWithAuthorityMissingCreds() throws Exception {
+ String uriString = String.format("%s://yourcontainer@youraccount.blob.core.windows.net/testDir", scheme);
+ final URI uri = URI.create(uriString);
+
+ exception.expect(AzureException.class);
+
+ AbstractAzureFSFactory factory = getFactory(scheme);
+ Configuration config = new Configuration();
+ config.setInteger("fs.azure.io.retry.max.retries", 0);
+ factory.configure(config);
+ factory.create(uri);
+ }
+
+ @Test
+ public void testCreateFsWithMissingAuthority() throws Exception {
+ String uriString = String.format("%s:///my/path", scheme);
+ final URI uri = URI.create(uriString);
+
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Cannot initialize WASB file system, URI authority not recognized.");
+
+ AbstractAzureFSFactory factory = getFactory(scheme);
+ factory.configure(new Configuration());
+ factory.create(uri);
+ }
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
new file mode 100644
index 0000000..6c65be9
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import com.microsoft.azure.credentials.ApplicationTokenCredentials;
+import com.microsoft.azure.credentials.AzureTokenCredentials;
+import com.microsoft.azure.management.Azure;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for Azure based
+ * file system.
+ */
+@RunWith(Parameterized.class)
+public class AzureFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+ @Parameterized.Parameter
+ public String scheme;
+
+ private static final String CONTAINER = System.getenv("ARTIFACTS_AZURE_CONTAINER");
+ private static final String ACCOUNT = System.getenv("ARTIFACTS_AZURE_STORAGE_ACCOUNT");
+ private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AZURE_ACCESS_KEY");
+ private static final String RESOURCE_GROUP = System.getenv("ARTIFACTS_AZURE_RESOURCE_GROUP");
+ private static final String SUBSCRIPTION_ID = System.getenv("ARTIFACTS_AZURE_SUBSCRIPTION_ID");
+ private static final String TOKEN_CREDENTIALS_FILE = System.getenv("ARTIFACTS_AZURE_TOKEN_CREDENTIALS_FILE");
+
+ private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+ // Azure Blob Storage defaults to https only storage accounts. We check if http support has been
+ // enabled on a best effort basis and test http if so.
+ @Parameterized.Parameters(name = "Scheme = {0}")
+ public static List<String> parameters() throws IOException {
+ boolean httpsOnly = isHttpsTrafficOnly();
+ return httpsOnly ? Arrays.asList("wasbs") : Arrays.asList("wasb", "wasbs");
+ }
+
+ private static boolean isHttpsTrafficOnly() throws IOException {
+ if (StringUtils.isNullOrWhitespaceOnly(RESOURCE_GROUP) || StringUtils.isNullOrWhitespaceOnly(TOKEN_CREDENTIALS_FILE)) {
+ // default to https only, as some fields are missing
+ return true;
+ }
+
+ Assume.assumeTrue("Azure storage account not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCOUNT));
+
+ AzureTokenCredentials credentials = ApplicationTokenCredentials.fromFile(new File(TOKEN_CREDENTIALS_FILE));
+ Azure azure =
+ StringUtils.isNullOrWhitespaceOnly(SUBSCRIPTION_ID) ?
+ Azure.authenticate(credentials).withDefaultSubscription() :
+ Azure.authenticate(credentials).withSubscription(SUBSCRIPTION_ID);
+
+ return azure.storageAccounts().getByResourceGroup(RESOURCE_GROUP, ACCOUNT).inner().enableHttpsTrafficOnly();
+ }
+
+ @BeforeClass
+ public static void checkCredentialsAndSetup() throws IOException {
+ // check whether credentials and container details exist
+ Assume.assumeTrue("Azure container not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(CONTAINER));
+ Assume.assumeTrue("Azure access key not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
+
+ // initialize configuration with valid credentials
+ final Configuration conf = new Configuration();
+ // fs.azure.account.key.youraccount.blob.core.windows.net = ACCESS_KEY
+ conf.setString("fs.azure.account.key." + ACCOUNT + ".blob.core.windows.net", ACCESS_KEY);
+ FileSystem.initialize(conf);
+ }
+
+ @AfterClass
+ public static void clearFsConfig() throws IOException {
+ FileSystem.initialize(new Configuration());
+ }
+
+ @Override
+ public FileSystem getFileSystem() throws Exception {
+ return getBasePath().getFileSystem();
+ }
+
+ @Override
+ public Path getBasePath() {
+ // wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDataDir
+ String uriString = scheme + "://" + CONTAINER + '@' + ACCOUNT + ".blob.core.windows.net/" + TEST_DATA_DIR;
+ return new Path(uriString);
+ }
+
+ @Test
+ public void testSimpleFileWriteAndRead() throws Exception {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+
+ final String testLine = "Hello Upload!";
+
+ final Path path = new Path(getBasePath() + "/test.txt");
+ final FileSystem fs = path.getFileSystem();
+
+ try {
+ try (FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.OVERWRITE);
+ OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+ writer.write(testLine);
+ }
+
+ // just in case, wait for the path to exist
+ checkPathEventualExistence(fs, path, true, deadline);
+
+ try (FSDataInputStream in = fs.open(path);
+ InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
+ BufferedReader reader = new BufferedReader(ir)) {
+ String line = reader.readLine();
+ assertEquals(testLine, line);
+ }
+ }
+ finally {
+ fs.delete(path, false);
+ }
+
+ // now file must be gone
+ checkPathEventualExistence(fs, path, false, deadline);
+ }
+
+ @Test
+ public void testDirectoryListing() throws Exception {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+
+ final Path directory = new Path(getBasePath() + "/testdir/");
+ final FileSystem fs = directory.getFileSystem();
+
+ // directory must not yet exist
+ assertFalse(fs.exists(directory));
+
+ try {
+ // create directory
+ assertTrue(fs.mkdirs(directory));
+
+ checkPathEventualExistence(fs, directory, true, deadline);
+
+ // directory empty
+ assertEquals(0, fs.listStatus(directory).length);
+
+ // create some files
+ final int numFiles = 3;
+ for (int i = 0; i < numFiles; i++) {
+ Path file = new Path(directory, "/file-" + i);
+ try (FSDataOutputStream out = fs.create(file, FileSystem.WriteMode.OVERWRITE);
+ OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+ writer.write("hello-" + i + "\n");
+ }
+ // just in case, wait for the file to exist (should then also be reflected in the
+ // directory's file list below)
+ checkPathEventualExistence(fs, file, true, deadline);
+ }
+
+ FileStatus[] files = fs.listStatus(directory);
+ assertNotNull(files);
+ assertEquals(3, files.length);
+
+ for (FileStatus status : files) {
+ assertFalse(status.isDir());
+ }
+
+ // now that there are files, the directory must exist
+ assertTrue(fs.exists(directory));
+ }
+ finally {
+ // clean up
+ fs.delete(directory, true);
+ }
+
+ // now directory must be gone
+ checkPathEventualExistence(fs, directory, false, deadline);
+ }
+
+ @Override
+ public FileSystemKind getFileSystemKind() {
+ return FileSystemKind.OBJECT_STORE;
+ }
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 6a5976a..1135e01 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -224,8 +224,8 @@ public class HadoopFileSystem extends FileSystem {
static FileSystemKind getKindForScheme(String scheme) {
scheme = scheme.toLowerCase(Locale.US);
- if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss")) {
- // the Amazon S3 storage or Aliyun OSS storage
+ if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss") || scheme.startsWith("wasb")) {
+ // the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
return FileSystemKind.OBJECT_STORE;
}
else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
similarity index 99%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
rename to flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
index 1bbb757..aa8fdfe 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common;
+package org.apache.flink.runtime.util;
import org.apache.flink.configuration.Configuration;
diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml
index 0b640a4..00d4086 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -166,7 +166,7 @@ under the License.
<filter>
<artifact>org.apache.flink:flink-hadoop-fs</artifact>
<excludes>
- <exclude>org/apache/flink/runtime/util/**</exclude>
+ <exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
</excludes>
</filter>
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index ff575be..a576a96 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 943de1d..ebf3b67 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index 9a5a80c..e7cf95e 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -106,6 +106,11 @@ under the License.
<pattern>com.amazon</pattern>
<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
</relocation>
+ <!-- shade Flink's Hadoop FS utility classes -->
+ <relocation>
+ <pattern>org.apache.flink.runtime.util</pattern>
+ <shadedPattern>org.apache.flink.fs.s3hadoop.common</shadedPattern>
+ </relocation>
</relocations>
<filters>
<filter>
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 2637e7b..6cad051 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3hadoop;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index 4471b38..57500f3 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.fs.s3hadoop;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.junit.Test;
diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml
index 8f88bbf..3fc8e03 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -290,6 +290,12 @@ under the License.
<pattern>com.google</pattern>
<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.google</shadedPattern>
</relocation>
+
+ <!-- shade Flink's Hadoop FS utility classes -->
+ <relocation>
+ <pattern>org.apache.flink.runtime.util</pattern>
+ <shadedPattern>org.apache.flink.fs.s3presto.common</shadedPattern>
+ </relocation>
</relocations>
<filters>
<filter>
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index c0c1beb..5a1ffee 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 093efc8..f3117a2 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml
index 8da2cea..c84e853 100644
--- a/flink-filesystems/pom.xml
+++ b/flink-filesystems/pom.xml
@@ -47,6 +47,7 @@ under the License.
<module>flink-s3-fs-presto</module>
<module>flink-swift-fs-hadoop</module>
<module>flink-oss-fs-hadoop</module>
+ <module>flink-azure-fs-hadoop</module>
</modules>
<!-- Common dependency setup for all filesystems -->