You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/28 15:27:23 UTC

[flink] 02/04: [FLINK-12115][fs] Add support for AzureFS

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e00ec88601583d370e14d7d969b20ab1cbc6ce3e
Author: Piyush Narang <p....@criteo.com>
AuthorDate: Thu Apr 4 16:00:46 2019 -0400

    [FLINK-12115][fs] Add support for AzureFS
    
    Check for http enabled storage accounts in AzureFS IT tests
    
    Add AzureFS standalone E2E test
---
 docs/ops/filesystems/azure.md                      |  77 ++++++++
 docs/ops/filesystems/azure.zh.md                   |  77 ++++++++
 docs/ops/filesystems/index.md                      |   9 +-
 docs/ops/filesystems/index.zh.md                   |   7 +-
 flink-dist/src/main/assemblies/opt.xml             |   7 +
 .../test-scripts/test_azure_fs.sh                  |  83 ++++++++
 .../pom.xml                                        |  78 +++++---
 .../flink/fs/azurefs/AbstractAzureFSFactory.java   |  85 ++++++++
 .../apache/flink/fs/azurefs/AzureFSFactory.java    |  30 +++
 .../flink/fs/azurefs/SecureAzureFSFactory.java     |  30 +++
 .../org.apache.flink.core.fs.FileSystemFactory     |  17 ++
 .../flink/fs/azurefs/AzureFSFactoryTest.java       |  94 +++++++++
 .../fs/azurefs/AzureFileSystemBehaviorITCase.java  | 220 +++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  27 +++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |   4 +-
 .../flink/runtime/util}/HadoopConfigLoader.java    |   2 +-
 flink-filesystems/flink-s3-fs-base/pom.xml         |   2 +-
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |   1 +
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |   1 +
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |   5 +
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |   2 +-
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  |   2 +-
 flink-filesystems/flink-s3-fs-presto/pom.xml       |   6 +
 .../flink/fs/s3presto/S3FileSystemFactory.java     |   2 +-
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |   2 +-
 flink-filesystems/pom.xml                          |   1 +
 26 files changed, 836 insertions(+), 35 deletions(-)

diff --git a/docs/ops/filesystems/azure.md b/docs/ops/filesystems/azure.md
new file mode 100644
index 0000000..36720c8
--- /dev/null
+++ b/docs/ops/filesystems/azure.md
@@ -0,0 +1,77 @@
+---
+title: "Azure Blob Storage"
+nav-title: Azure Blob Storage
+nav-parent_id: filesystems
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
+You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)  
+
+* This will be replaced by the TOC
+{:toc}
+
+You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
+
+{% highlight plain %}
+wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+
+// SSL encrypted access
+wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+{% endhighlight %}
+
+Below shows how to use Azure Blob Storage with Flink:
+
+{% highlight java %}
+// Read from Azure Blob storage
+env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
+
+// Write to Azure Blob storage
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+
+// Use Azure Blob Storage as FsStatebackend
+env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
+{% endhighlight %}
+
+### Shaded Hadoop Azure Blob Storage file system 
+
+To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
+
+#### Configurations setup
+After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
+
+To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
+
+There are some required configurations that must be added to `flink-conf.yaml`:
+
+{% highlight yaml %}
+fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
+{% endhighlight %}
+
+{% top %} 
diff --git a/docs/ops/filesystems/azure.zh.md b/docs/ops/filesystems/azure.zh.md
new file mode 100644
index 0000000..36720c8
--- /dev/null
+++ b/docs/ops/filesystems/azure.zh.md
@@ -0,0 +1,77 @@
+---
+title: "Azure Blob Storage"
+nav-title: Azure Blob Storage
+nav-parent_id: filesystems
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
+You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)  
+
+* This will be replaced by the TOC
+{:toc}
+
+You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
+
+{% highlight plain %}
+wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+
+// SSL encrypted access
+wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+{% endhighlight %}
+
+Below shows how to use Azure Blob Storage with Flink:
+
+{% highlight java %}
+// Read from Azure Blob storage
+env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
+
+// Write to Azure Blob storage
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+
+// Use Azure Blob Storage as FsStatebackend
+env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
+{% endhighlight %}
+
+### Shaded Hadoop Azure Blob Storage file system 
+
+To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
+
+#### Configurations setup
+After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
+
+To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
+
+There are some required configurations that must be added to `flink-conf.yaml`:
+
+{% highlight yaml %}
+fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
+{% endhighlight %}
+
+{% top %} 
diff --git a/docs/ops/filesystems/index.md b/docs/ops/filesystems/index.md
index 0d4a1be..eb4087d 100644
--- a/docs/ops/filesystems/index.md
+++ b/docs/ops/filesystems/index.md
@@ -25,7 +25,7 @@ under the License.
 -->
 
 Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
-These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
+These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
 
 The file system used for a particular file is determined by its URI scheme.
 For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
@@ -43,12 +43,17 @@ Flink ships with implementations for the following file systems:
 
   - **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint.
     
-  - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
+  - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.  
   
   - **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*. 
   The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
   To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
   When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+  
+  - **Azure Blob Storage**: 
+    Flink directly provides a file system to work with Azure Blob Storage. 
+    This filesystem is registered under the scheme *"wasb(s)://"*.
+    The implementation is self-contained with no dependency footprint.
 
 ## HDFS and Hadoop File System support 
 
diff --git a/docs/ops/filesystems/index.zh.md b/docs/ops/filesystems/index.zh.md
index 0d4a1be..414c82f 100644
--- a/docs/ops/filesystems/index.zh.md
+++ b/docs/ops/filesystems/index.zh.md
@@ -25,7 +25,7 @@ under the License.
 -->
 
 Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
-These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
+These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
 
 The file system used for a particular file is determined by its URI scheme.
 For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
@@ -49,6 +49,11 @@ Flink ships with implementations for the following file systems:
   The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
   To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
   When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+  
+  - **Azure Blob Storage**: 
+    Flink directly provides a file system to work with Azure Blob Storage. 
+    This filesystem is registered under the scheme *"wasb(s)://"*.
+    The implementation is self-contained with no dependency footprint.
 
 ## HDFS and Hadoop File System support 
 
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 1ce6e88..e28acd8 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -154,6 +154,13 @@
 			<fileMode>0644</fileMode>
 		</file>
 
+		<file>
+			<source>../flink-filesystems/flink-azure-fs-hadoop/target/flink-azure-fs-hadoop-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-azure-fs-hadoop-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
 		<!-- Queryable State -->
 		<file>
 			<source>../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar</source>
diff --git a/flink-end-to-end-tests/test-scripts/test_azure_fs.sh b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh
new file mode 100755
index 0000000..40c6962
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tests for Azure file system.
+
+# To run single test, export IT_CASE_AZURE_ACCOUNT, IT_CASE_AZURE_ACCESS_KEY, IT_CASE_AZURE_CONTAINER to
+# the appropriate values and run:
+# flink-end-to-end-tests/run-single-test.sh skip flink-end-to-end-tests/test-scripts/test_azure_fs.sh
+
+source "$(dirname "$0")"/common.sh
+
+if [[ -z "$IT_CASE_AZURE_ACCOUNT" ]]; then
+    echo "Did not find Azure storage account environment variable, NOT running the e2e test."
+    exit 0
+else
+    echo "Found Azure storage account $IT_CASE_AZURE_ACCOUNT, running the e2e test."
+fi
+
+if [[ -z "$IT_CASE_AZURE_ACCESS_KEY" ]]; then
+    echo "Did not find Azure storage access key environment variable, NOT running the e2e test."
+    exit 0
+else
+    echo "Found Azure storage access key $IT_CASE_AZURE_ACCESS_KEY, running the e2e test."
+fi
+
+if [[ -z "$IT_CASE_AZURE_CONTAINER" ]]; then
+    echo "Did not find Azure storage container environment variable, NOT running the e2e test."
+    exit 0
+else
+    echo "Found Azure storage container $IT_CASE_AZURE_CONTAINER, running the e2e test."
+fi
+
+AZURE_TEST_DATA_WORDS_URI="wasbs://$IT_CASE_AZURE_CONTAINER@$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net/words"
+
+###################################
+# Setup Flink Azure access.
+#
+# Globals:
+#   FLINK_DIR
+#   IT_CASE_AZURE_ACCOUNT
+#   IT_CASE_AZURE_ACCESS_KEY
+# Returns:
+#   None
+###################################
+function azure_setup {
+  # make sure we delete the file at the end
+  function azure_cleanup {
+    rm $FLINK_DIR/lib/flink-azure-fs*.jar
+
+    # remove any leftover settings
+    sed -i -e 's/fs.azure.account.key.*//' "$FLINK_DIR/conf/flink-conf.yaml"
+  }
+  trap azure_cleanup EXIT
+
+  echo "Copying flink azure jars and writing out configs"
+  cp $FLINK_DIR/opt/flink-azure-fs-hadoop-*.jar $FLINK_DIR/lib/
+  echo "fs.azure.account.key.$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net: $IT_CASE_AZURE_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+}
+
+azure_setup
+
+echo "Starting Flink cluster.."
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input $AZURE_TEST_DATA_WORDS_URI --output $TEST_DATA_DIR/out/wc_out
+
+check_result_hash "WordCountWithAzureFS" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
similarity index 57%
copy from flink-filesystems/flink-s3-fs-hadoop/pom.xml
copy to flink-filesystems/flink-azure-fs-hadoop/pom.xml
index 9a5a80c..37567ce 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
@@ -27,14 +27,20 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-s3-fs-hadoop</artifactId>
-	<name>flink-s3-fs-hadoop</name>
+	<artifactId>flink-azure-fs-hadoop</artifactId>
+	<name>flink-azure-fs-hadoop</name>
 
 	<packaging>jar</packaging>
 
+	<!-- need to use a release which includes this patch: https://github.com/apache/hadoop/commit/02cadbd24bf69925078d044701741e2e3fcb4b2f -->
+	<properties>
+		<fs.azure.version>2.7.0</fs.azure.version>
+		<fs.azure.sdk.version>1.16.0</fs.azure.sdk.version>
+		<fs.jackson.core.version>2.9.4</fs.jackson.core.version>
+	</properties>
+
 	<dependencies>
 
-		<!-- Flink's file system abstraction (compiled against, not bundled) -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -42,42 +48,59 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-		<!-- S3 base -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-s3-fs-base</artifactId>
+			<artifactId>flink-hadoop-fs</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
-		<!-- for the behavior test suite -->
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-azure</artifactId>
+			<version>${fs.azure.version}</version>
 		</dependency>
 
-		<!-- for the settings that make unshaded tests run -->
+		<!-- for the Azure HDFS related tests -->
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-fs-hadoop-shaded</artifactId>
-			<version>${project.version}</version>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+
+		<!-- for Azure IT tests to check if HTTP endpoints are enabled / not -->
+		<dependency>
+			<groupId>com.microsoft.azure</groupId>
+			<artifactId>azure</artifactId>
+			<version>${fs.azure.sdk.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-core</artifactId>
+			<version>${fs.jackson.core.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
 			<scope>test</scope>
-			<type>test-jar</type>
 		</dependency>
 
+		<!-- for the behavior test suite -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-fs</artifactId>
+			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
 	</dependencies>
 
 	<build>
 		<plugins>
+			<!-- Relocate all Azure related classes -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-shade-plugin</artifactId>
@@ -96,22 +119,30 @@ under the License.
 								</includes>
 							</artifactSet>
 							<relocations>
-								<!-- relocate the references to Hadoop to match the shaded Hadoop config -->
 								<relocation>
 									<pattern>org.apache.hadoop</pattern>
-									<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
+									<shadedPattern>org.apache.flink.fs.shaded.hadoop.org.apache.hadoop</shadedPattern>
 								</relocation>
-								<!-- relocate the AWS dependencies -->
+								<!-- relocate the azure-storage dependencies -->
 								<relocation>
-									<pattern>com.amazon</pattern>
-									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
+									<pattern>com.microsoft.azure.storage</pattern>
+									<shadedPattern>org.apache.flink.fs.shaded.com.microsoft.azure.storage</shadedPattern>
 								</relocation>
 							</relocations>
 							<filters>
 								<filter>
 									<artifact>*</artifact>
 									<excludes>
-										<exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
+										<exclude>properties.dtd</exclude>
+										<exclude>PropertyList-1.0.dtd</exclude>
+										<exclude>mozilla/**</exclude>
+										<exclude>META-INF/maven/**</exclude>
+										<exclude>META-INF/LICENSE.txt</exclude>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+										<exclude>core-default.xml</exclude>
+										<exclude>hdfs-default.xml</exclude>
 									</excludes>
 								</filter>
 							</filters>
@@ -121,5 +152,4 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
-
 </project>
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
new file mode 100644
index 0000000..7ae9df8
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs). Based on Azure HDFS support in the
+ * <a href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class);
+
+	private static final String[] FLINK_CONFIG_PREFIXES = { "fs.azure.", "azure." };
+	private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";
+
+	private static final String[][] MIRRORED_CONFIG_KEYS = {};
+	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.emptySet();
+	private static final Set<String> CONFIG_KEYS_TO_SHADE = Collections.emptySet();
+	private static final String FLINK_SHADING_PREFIX = "";
+
+	private final HadoopConfigLoader configLoader;
+
+	private Configuration flinkConfig;
+
+	public AbstractAzureFSFactory() {
+		this.configLoader = new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+			HADOOP_CONFIG_PREFIX, PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		configLoader.setFlinkConfig(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "passed file system URI object should not be null");
+		LOG.info("Trying to load and instantiate Azure File System");
+		return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
+	}
+
+	// uri is of the form: wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDir
+	private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException {
+		org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+
+		org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
+		azureFS.initialize(fsUri, hadoopConfig);
+
+		return azureFS;
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java
new file mode 100644
index 0000000..5f6246d
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+/**
+ * A factory for the Azure file system over HTTP.
+ */
+public class AzureFSFactory extends AbstractAzureFSFactory {
+
+	@Override
+	public String getScheme() {
+		return "wasb";
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java
new file mode 100644
index 0000000..7130a87
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+/**
+ * A factory for the Azure file system over HTTPs.
+ */
+public class SecureAzureFSFactory extends AbstractAzureFSFactory {
+
+	@Override
+	public String getScheme() {
+		return "wasbs";
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..4d6a19a
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.fs.azurefs.AzureFSFactory
+org.apache.flink.fs.azurefs.SecureAzureFSFactory
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
new file mode 100644
index 0000000..01b79b5
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.azure.AzureException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests for the AzureFSFactory.
+ */
+@RunWith(Parameterized.class)
+public class AzureFSFactoryTest extends TestLogger {
+
+	@Parameterized.Parameter
+	public String scheme;
+
+	@Parameterized.Parameters(name = "Scheme = {0}")
+	public static List<String> parameters() {
+		return Arrays.asList("wasb", "wasbs");
+	}
+
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
+	private AbstractAzureFSFactory getFactory(String scheme) {
+		return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory();
+	}
+
+	@Test
+	public void testNullFsURI() throws Exception {
+		URI uri = null;
+		AbstractAzureFSFactory factory = getFactory(scheme);
+
+		exception.expect(NullPointerException.class);
+		exception.expectMessage("passed file system URI object should not be null");
+
+		factory.create(uri);
+	}
+
+	// missing credentials
+	@Test
+	public void testCreateFsWithAuthorityMissingCreds() throws Exception {
+		String uriString = String.format("%s://yourcontainer@youraccount.blob.core.windows.net/testDir", scheme);
+		final URI uri = URI.create(uriString);
+
+		exception.expect(AzureException.class);
+
+		AbstractAzureFSFactory factory = getFactory(scheme);
+		Configuration config = new Configuration();
+		config.setInteger("fs.azure.io.retry.max.retries", 0);
+		factory.configure(config);
+		factory.create(uri);
+	}
+
+	@Test
+	public void testCreateFsWithMissingAuthority() throws Exception {
+		String uriString = String.format("%s:///my/path", scheme);
+		final URI uri = URI.create(uriString);
+
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Cannot initialize WASB file system, URI authority not recognized.");
+
+		AbstractAzureFSFactory factory = getFactory(scheme);
+		factory.configure(new Configuration());
+		factory.create(uri);
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
new file mode 100644
index 0000000..6c65be9
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import com.microsoft.azure.credentials.ApplicationTokenCredentials;
+import com.microsoft.azure.credentials.AzureTokenCredentials;
+import com.microsoft.azure.management.Azure;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for Azure based
+ * file system.
+ */
+@RunWith(Parameterized.class)
+public class AzureFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+	@Parameterized.Parameter
+	public String scheme;
+
+	private static final String CONTAINER = System.getenv("ARTIFACTS_AZURE_CONTAINER");
+	private static final String ACCOUNT = System.getenv("ARTIFACTS_AZURE_STORAGE_ACCOUNT");
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AZURE_ACCESS_KEY");
+	private static final String RESOURCE_GROUP = System.getenv("ARTIFACTS_AZURE_RESOURCE_GROUP");
+	private static final String SUBSCRIPTION_ID = System.getenv("ARTIFACTS_AZURE_SUBSCRIPTION_ID");
+	private static final String TOKEN_CREDENTIALS_FILE = System.getenv("ARTIFACTS_AZURE_TOKEN_CREDENTIALS_FILE");
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	// Azure Blob Storage defaults to https only storage accounts. We check if http support has been
+	// enabled on a best effort basis and test http if so.
+	@Parameterized.Parameters(name = "Scheme = {0}")
+	public static List<String> parameters() throws IOException {
+		boolean httpsOnly = isHttpsTrafficOnly();
+		return httpsOnly ? Arrays.asList("wasbs") : Arrays.asList("wasb", "wasbs");
+	}
+
+	private static boolean isHttpsTrafficOnly() throws IOException {
+		if (StringUtils.isNullOrWhitespaceOnly(RESOURCE_GROUP) || StringUtils.isNullOrWhitespaceOnly(TOKEN_CREDENTIALS_FILE)) {
+			// default to https only, as some fields are missing
+			return true;
+		}
+
+		Assume.assumeTrue("Azure storage account not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCOUNT));
+
+		AzureTokenCredentials credentials = ApplicationTokenCredentials.fromFile(new File(TOKEN_CREDENTIALS_FILE));
+		Azure azure =
+			StringUtils.isNullOrWhitespaceOnly(SUBSCRIPTION_ID) ?
+				Azure.authenticate(credentials).withDefaultSubscription() :
+				Azure.authenticate(credentials).withSubscription(SUBSCRIPTION_ID);
+
+		return azure.storageAccounts().getByResourceGroup(RESOURCE_GROUP, ACCOUNT).inner().enableHttpsTrafficOnly();
+	}
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials and container details exist
+		Assume.assumeTrue("Azure container not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(CONTAINER));
+		Assume.assumeTrue("Azure access key not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		// fs.azure.account.key.youraccount.blob.core.windows.net = ACCESS_KEY
+		conf.setString("fs.azure.account.key." + ACCOUNT + ".blob.core.windows.net", ACCESS_KEY);
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void clearFsConfig() throws IOException {
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Override
+	public FileSystem getFileSystem() throws Exception {
+		return getBasePath().getFileSystem();
+	}
+
+	@Override
+	public Path getBasePath() {
+		// wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDataDir
+		String uriString = scheme + "://" + CONTAINER + '@' + ACCOUNT + ".blob.core.windows.net/" + TEST_DATA_DIR;
+		return new Path(uriString);
+	}
+
+	@Test
+	public void testSimpleFileWriteAndRead() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+
+		final String testLine = "Hello Upload!";
+
+		final Path path = new Path(getBasePath() + "/test.txt");
+		final FileSystem fs = path.getFileSystem();
+
+		try {
+			try (FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.OVERWRITE);
+				OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+				writer.write(testLine);
+			}
+
+			// just in case, wait for the path to exist
+			checkPathEventualExistence(fs, path, true, deadline);
+
+			try (FSDataInputStream in = fs.open(path);
+				InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
+				BufferedReader reader = new BufferedReader(ir)) {
+				String line = reader.readLine();
+				assertEquals(testLine, line);
+			}
+		}
+		finally {
+			fs.delete(path, false);
+		}
+
+		// now file must be gone
+		checkPathEventualExistence(fs, path, false, deadline);
+	}
+
+	@Test
+	public void testDirectoryListing() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+
+		final Path directory = new Path(getBasePath() + "/testdir/");
+		final FileSystem fs = directory.getFileSystem();
+
+		// directory must not yet exist
+		assertFalse(fs.exists(directory));
+
+		try {
+			// create directory
+			assertTrue(fs.mkdirs(directory));
+
+			checkPathEventualExistence(fs, directory, true, deadline);
+
+			// directory empty
+			assertEquals(0, fs.listStatus(directory).length);
+
+			// create some files
+			final int numFiles = 3;
+			for (int i = 0; i < numFiles; i++) {
+				Path file = new Path(directory, "/file-" + i);
+				try (FSDataOutputStream out = fs.create(file, FileSystem.WriteMode.OVERWRITE);
+					OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+					writer.write("hello-" + i + "\n");
+				}
+				// just in case, wait for the file to exist (should then also be reflected in the
+				// directory's file list below)
+				checkPathEventualExistence(fs, file, true, deadline);
+			}
+
+			FileStatus[] files = fs.listStatus(directory);
+			assertNotNull(files);
+			assertEquals(3, files.length);
+
+			for (FileStatus status : files) {
+				assertFalse(status.isDir());
+			}
+
+			// now that there are files, the directory must exist
+			assertTrue(fs.exists(directory));
+		}
+		finally {
+			// clean up
+			fs.delete(directory, true);
+		}
+
+		// now directory must be gone
+		checkPathEventualExistence(fs, directory, false, deadline);
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 6a5976a..1135e01 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -224,8 +224,8 @@ public class HadoopFileSystem extends FileSystem {
 	static FileSystemKind getKindForScheme(String scheme) {
 		scheme = scheme.toLowerCase(Locale.US);
 
-		if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss")) {
-			// the Amazon S3 storage or Aliyun OSS storage
+		if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss") || scheme.startsWith("wasb")) {
+			// the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
 			return FileSystemKind.OBJECT_STORE;
 		}
 		else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
similarity index 99%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
rename to flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
index 1bbb757..aa8fdfe 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.fs.s3.common;
+package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.Configuration;
 
diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml
index 0b640a4..00d4086 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -166,7 +166,7 @@ under the License.
 								<filter>
 									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
 									<excludes>
-										<exclude>org/apache/flink/runtime/util/**</exclude>
+										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
 										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
 									</excludes>
 								</filter>
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index ff575be..a576a96 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 943de1d..ebf3b67 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index 9a5a80c..e7cf95e 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -106,6 +106,11 @@ under the License.
 									<pattern>com.amazon</pattern>
 									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
 								</relocation>
+								<!-- shade Flink's Hadoop FS utility classes -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.util</pattern>
+									<shadedPattern>org.apache.flink.fs.s3hadoop.common</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 2637e7b..6cad051 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3hadoop;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index 4471b38..57500f3 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3hadoop;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import org.junit.Test;
 
diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml
index 8f88bbf..3fc8e03 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -290,6 +290,12 @@ under the License.
 									<pattern>com.google</pattern>
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.google</shadedPattern>
 								</relocation>
+
+								<!-- shade Flink's Hadoop FS utility classes -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.util</pattern>
+									<shadedPattern>org.apache.flink.fs.s3presto.common</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index c0c1beb..5a1ffee 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.s3.PrestoS3FileSystem;
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 093efc8..f3117a2 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml
index 8da2cea..c84e853 100644
--- a/flink-filesystems/pom.xml
+++ b/flink-filesystems/pom.xml
@@ -47,6 +47,7 @@ under the License.
 		<module>flink-s3-fs-presto</module>
 		<module>flink-swift-fs-hadoop</module>
 		<module>flink-oss-fs-hadoop</module>
+		<module>flink-azure-fs-hadoop</module>
 	</modules>
 
 	<!-- Common dependency setup for all filesystems -->