You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/28 15:27:21 UTC

[flink] branch master updated (b333ddc -> ec2c1a2)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b333ddc  [FLINK-12600][table-planner-blink] Introduce various deterministic rewriting rule, which includes:
     new 6489237  [hotfix] Correct redirect from /ops/deployment/oss.html to /ops/filesystems/oss.html
     new e00ec88  [FLINK-12115][fs] Add support for AzureFS
     new 09dae1f  [FLINK-12115][fs] Decrease dependency footprint of flink-azure-fs-hadoop
     new ec2c1a2  [FLINK-12115][fs] Add NOTICE file for flink-azure-fs-hadoop

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 NOTICE-binary                                      |  22 +++
 docs/ops/filesystems/azure.md                      |  77 ++++++++
 docs/ops/filesystems/azure.zh.md                   |  77 ++++++++
 docs/ops/filesystems/index.md                      |   9 +-
 docs/ops/filesystems/index.zh.md                   |   7 +-
 docs/redirects/oss.md                              |   6 +-
 flink-dist/src/main/assemblies/opt.xml             |   7 +
 .../test-scripts/test_azure_fs.sh                  |  83 ++++++++
 .../pom.xml                                        | 127 ++++++------
 .../flink/fs/azurefs/AbstractAzureFSFactory.java   |  85 ++++++++
 .../apache/flink/fs/azurefs/AzureFSFactory.java}   |   9 +-
 .../flink/fs/azurefs/SecureAzureFSFactory.java}    |   9 +-
 .../src/main/resources/META-INF/NOTICE             |  21 ++
 .../org.apache.flink.core.fs.FileSystemFactory     |   3 +-
 .../flink/fs/azurefs/AzureFSFactoryTest.java       |  94 +++++++++
 .../fs/azurefs/AzureFileSystemBehaviorITCase.java  | 220 +++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |   0
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |   4 +-
 .../flink/runtime/util}/HadoopConfigLoader.java    |   2 +-
 flink-filesystems/flink-s3-fs-base/pom.xml         |   2 +-
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |   1 +
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |   1 +
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |   5 +
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |   2 +-
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  |   2 +-
 flink-filesystems/flink-s3-fs-presto/pom.xml       |   6 +
 .../flink/fs/s3presto/S3FileSystemFactory.java     |   2 +-
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |   2 +-
 flink-filesystems/pom.xml                          |   1 +
 29 files changed, 803 insertions(+), 83 deletions(-)
 create mode 100644 docs/ops/filesystems/azure.md
 create mode 100644 docs/ops/filesystems/azure.zh.md
 create mode 100755 flink-end-to-end-tests/test-scripts/test_azure_fs.sh
 copy flink-filesystems/{flink-s3-fs-base => flink-azure-fs-hadoop}/pom.xml (55%)
 create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
 copy flink-filesystems/{flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3AFileSystemFactory.java => flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java} (80%)
 copy flink-filesystems/{flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3AFileSystemFactory.java => flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java} (80%)
 create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/NOTICE
 copy flink-filesystems/{flink-mapr-fs => flink-azure-fs-hadoop}/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory (89%)
 create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
 create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
 copy flink-filesystems/{flink-swift-fs-hadoop => flink-azure-fs-hadoop}/src/test/resources/log4j-test.properties (100%)
 rename flink-filesystems/{flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common => flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util}/HadoopConfigLoader.java (99%)


[flink] 03/04: [FLINK-12115][fs] Decrease dependency footprint of flink-azure-fs-hadoop

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09dae1ff4380b8584c9fc52b8aac32edc96eaa2e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri May 24 15:32:12 2019 +0200

    [FLINK-12115][fs] Decrease dependency footprint of flink-azure-fs-hadoop
---
 flink-filesystems/flink-azure-fs-hadoop/pom.xml | 92 ++++++++++++++++++-------
 1 file changed, 67 insertions(+), 25 deletions(-)

diff --git a/flink-filesystems/flink-azure-fs-hadoop/pom.xml b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
index 37567ce..781e851 100644
--- a/flink-filesystems/flink-azure-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
@@ -34,7 +34,6 @@ under the License.
 
 	<!-- need to use a release which includes this patch: https://github.com/apache/hadoop/commit/02cadbd24bf69925078d044701741e2e3fcb4b2f -->
 	<properties>
-		<fs.azure.version>2.7.0</fs.azure.version>
 		<fs.azure.sdk.version>1.16.0</fs.azure.sdk.version>
 		<fs.jackson.core.version>2.9.4</fs.jackson.core.version>
 	</properties>
@@ -55,16 +54,21 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-azure</artifactId>
-			<version>${fs.azure.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-fs-hadoop-shaded</artifactId>
+			<version>${project.version}</version>
 		</dependency>
 
-		<!-- for the Azure HDFS related tests -->
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<version>${hadoop.version}</version>
+			<artifactId>hadoop-azure</artifactId>
+			<version>${fs.hadoopshaded.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- for Azure IT tests to check if HTTP endpoints are enabled / not -->
@@ -74,18 +78,6 @@ under the License.
 			<version>${fs.azure.sdk.version}</version>
 			<scope>test</scope>
 		</dependency>
-		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-core</artifactId>
-			<version>${fs.jackson.core.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-			<scope>test</scope>
-		</dependency>
 
 		<!-- for the behavior test suite -->
 		<dependency>
@@ -119,18 +111,70 @@ under the License.
 								</includes>
 							</artifactSet>
 							<relocations>
+								<!-- relocate the references to Hadoop to match the shaded Hadoop config -->
 								<relocation>
 									<pattern>org.apache.hadoop</pattern>
-									<shadedPattern>org.apache.flink.fs.shaded.hadoop.org.apache.hadoop</shadedPattern>
+									<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
 								</relocation>
-								<!-- relocate the azure-storage dependencies -->
+
+								<!-- shade dependencies internally used by Hadoop and never exposed downstream -->
+								<relocation>
+									<pattern>org.apache.commons</pattern>
+									<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.commons</shadedPattern>
+								</relocation>
+
+								<!-- relocate the Azure dependencies -->
 								<relocation>
-									<pattern>com.microsoft.azure.storage</pattern>
-									<shadedPattern>org.apache.flink.fs.shaded.com.microsoft.azure.storage</shadedPattern>
+									<pattern>com.microsoft.azure</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.com.microsoft.azure</shadedPattern>
+								</relocation>
+
+								<!-- shade dependencies internally used by Azure and never exposed downstream -->
+								<relocation>
+									<pattern>org.apache.httpcomponents</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.org.apache.httpcomponents</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>commons-logging</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.commons-logging</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>commons-codec</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.commons-codec</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.fasterxml</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.com.fasterxml</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.google</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.com.google</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>org.eclipse</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.shaded.org.eclipse</shadedPattern>
+								</relocation>
+
+								<!-- shade Flink's Hadoop FS adapter classes  -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.fs.hdfs</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.common.hadoop</shadedPattern>
+								</relocation>
+								<!-- shade Flink's Hadoop FS utility classes -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.util</pattern>
+									<shadedPattern>org.apache.flink.fs.azure.common</shadedPattern>
 								</relocation>
 							</relocations>
 							<filters>
 								<filter>
+									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
+									<excludes>
+										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
+										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
+									</excludes>
+								</filter>
+								<filter>
 									<artifact>*</artifact>
 									<excludes>
 										<exclude>properties.dtd</exclude>
@@ -141,8 +185,6 @@ under the License.
 										<exclude>META-INF/*.SF</exclude>
 										<exclude>META-INF/*.DSA</exclude>
 										<exclude>META-INF/*.RSA</exclude>
-										<exclude>core-default.xml</exclude>
-										<exclude>hdfs-default.xml</exclude>
 									</excludes>
 								</filter>
 							</filters>


[flink] 04/04: [FLINK-12115][fs] Add NOTICE file for flink-azure-fs-hadoop

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec2c1a2944274c47bd000c569a903221d9b951f0
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri May 24 15:32:30 2019 +0200

    [FLINK-12115][fs] Add NOTICE file for flink-azure-fs-hadoop
    
    This closes #8537.
    This closes #8117.
---
 NOTICE-binary                                      | 22 ++++++++++++++++++++++
 .../src/main/resources/META-INF/NOTICE             | 21 +++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/NOTICE-binary b/NOTICE-binary
index 2c20a5e..a490777 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -305,6 +305,28 @@ See bundled license files for details.
 
 - font-awesome:4.5.0 (Font)
 
+flink-azure-fs-hadoop
+Copyright 2014-2019 The Apache Software Foundation
+
+This project includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.fasterxml.jackson.core:jackson-annotations:2.7.0
+- com.fasterxml.jackson.core:jackson-core:2.7.8
+- com.fasterxml.jackson.core:jackson-databind:2.7.8
+- com.google.guava:guava:11.0.2
+- com.microsoft.azure:azure-keyvault-core:0.8.0
+- com.microsoft.azure:azure-storage:5.4.0
+- commons-codec:commons-codec:1.10
+- commons-logging:commons-logging:1.1.3
+- org.apache.hadoop:hadoop-azure:3.1.0
+- org.apache.httpcomponents:httpclient:4.5.3
+- org.apache.httpcomponents:httpcore:4.4.6
+- org.eclipse.jetty:jetty-util:9.3.19.v20170502
+- org.eclipse.jetty:jetty-util-ajax:9.3.19.v20170502
+
 flink-swift-fs-hadoop
 Copyright 2014-2019 The Apache Software Foundation
 
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..9235626
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,21 @@
+flink-azure-fs-hadoop
+Copyright 2014-2019 The Apache Software Foundation
+
+This project includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.fasterxml.jackson.core:jackson-annotations:2.7.0
+- com.fasterxml.jackson.core:jackson-core:2.7.8
+- com.fasterxml.jackson.core:jackson-databind:2.7.8
+- com.google.guava:guava:11.0.2
+- com.microsoft.azure:azure-keyvault-core:0.8.0
+- com.microsoft.azure:azure-storage:5.4.0
+- commons-codec:commons-codec:1.10
+- commons-logging:commons-logging:1.1.3
+- org.apache.hadoop:hadoop-azure:3.1.0
+- org.apache.httpcomponents:httpclient:4.5.3
+- org.apache.httpcomponents:httpcore:4.4.6
+- org.eclipse.jetty:jetty-util:9.3.19.v20170502
+- org.eclipse.jetty:jetty-util-ajax:9.3.19.v20170502


[flink] 01/04: [hotfix] Correct redirect from /ops/deployment/oss.html to /ops/filesystems/oss.html

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6489237bd64af60c207fcbdd9da51594ea5c00a8
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri May 24 14:39:48 2019 +0200

    [hotfix] Correct redirect from /ops/deployment/oss.html to /ops/filesystems/oss.html
---
 docs/redirects/oss.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/redirects/oss.md b/docs/redirects/oss.md
index 3b34502..b9df3e8 100644
--- a/docs/redirects/oss.md
+++ b/docs/redirects/oss.md
@@ -1,8 +1,8 @@
 ---
 title: "Aliyun Object Storage Service (OSS)"
 layout: redirect
-redirect: /ops/deployment/oss.html
-permalink: /ops/filesystems/oss.html
+redirect: /ops/filesystems/oss.html
+permalink: /ops/deployment/oss.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -21,4 +21,4 @@ software distributed under the License is distributed on an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
\ No newline at end of file
+-->


[flink] 02/04: [FLINK-12115][fs] Add support for AzureFS

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e00ec88601583d370e14d7d969b20ab1cbc6ce3e
Author: Piyush Narang <p....@criteo.com>
AuthorDate: Thu Apr 4 16:00:46 2019 -0400

    [FLINK-12115][fs] Add support for AzureFS
    
    Check for http enabled storage accounts in AzureFS IT tests
    
    Add AzureFS standalone E2E test
---
 docs/ops/filesystems/azure.md                      |  77 ++++++++
 docs/ops/filesystems/azure.zh.md                   |  77 ++++++++
 docs/ops/filesystems/index.md                      |   9 +-
 docs/ops/filesystems/index.zh.md                   |   7 +-
 flink-dist/src/main/assemblies/opt.xml             |   7 +
 .../test-scripts/test_azure_fs.sh                  |  83 ++++++++
 .../pom.xml                                        |  78 +++++---
 .../flink/fs/azurefs/AbstractAzureFSFactory.java   |  85 ++++++++
 .../apache/flink/fs/azurefs/AzureFSFactory.java    |  30 +++
 .../flink/fs/azurefs/SecureAzureFSFactory.java     |  30 +++
 .../org.apache.flink.core.fs.FileSystemFactory     |  17 ++
 .../flink/fs/azurefs/AzureFSFactoryTest.java       |  94 +++++++++
 .../fs/azurefs/AzureFileSystemBehaviorITCase.java  | 220 +++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  27 +++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |   4 +-
 .../flink/runtime/util}/HadoopConfigLoader.java    |   2 +-
 flink-filesystems/flink-s3-fs-base/pom.xml         |   2 +-
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |   1 +
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |   1 +
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |   5 +
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |   2 +-
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  |   2 +-
 flink-filesystems/flink-s3-fs-presto/pom.xml       |   6 +
 .../flink/fs/s3presto/S3FileSystemFactory.java     |   2 +-
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |   2 +-
 flink-filesystems/pom.xml                          |   1 +
 26 files changed, 836 insertions(+), 35 deletions(-)

diff --git a/docs/ops/filesystems/azure.md b/docs/ops/filesystems/azure.md
new file mode 100644
index 0000000..36720c8
--- /dev/null
+++ b/docs/ops/filesystems/azure.md
@@ -0,0 +1,77 @@
+---
+title: "Azure Blob Storage"
+nav-title: Azure Blob Storage
+nav-parent_id: filesystems
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
+You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)  
+
+* This will be replaced by the TOC
+{:toc}
+
+You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
+
+{% highlight plain %}
+wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+
+// SSL encrypted access
+wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+{% endhighlight %}
+
+Below shows how to use Azure Blob Storage with Flink:
+
+{% highlight java %}
+// Read from Azure Blob storage
+env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
+
+// Write to Azure Blob storage
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+
+// Use Azure Blob Storage as FsStatebackend
+env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
+{% endhighlight %}
+
+### Shaded Hadoop Azure Blob Storage file system 
+
+To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
+
+#### Configurations setup
+After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
+
+To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
+
+There are some required configurations that must be added to `flink-conf.yaml`:
+
+{% highlight yaml %}
+fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
+{% endhighlight %}
+
+{% top %} 
diff --git a/docs/ops/filesystems/azure.zh.md b/docs/ops/filesystems/azure.zh.md
new file mode 100644
index 0000000..36720c8
--- /dev/null
+++ b/docs/ops/filesystems/azure.zh.md
@@ -0,0 +1,77 @@
+---
+title: "Azure Blob Storage"
+nav-title: Azure Blob Storage
+nav-parent_id: filesystems
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
+You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)  
+
+* This will be replaced by the TOC
+{:toc}
+
+You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
+
+{% highlight plain %}
+wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+
+// SSL encrypted access
+wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
+{% endhighlight %}
+
+Below shows how to use Azure Blob Storage with Flink:
+
+{% highlight java %}
+// Read from Azure Blob storage
+env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
+
+// Write to Azure Blob storage
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+
+// Use Azure Blob Storage as FsStatebackend
+env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
+{% endhighlight %}
+
+### Shaded Hadoop Azure Blob Storage file system 
+
+To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
+
+#### Configurations setup
+After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
+
+To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
+
+There are some required configurations that must be added to `flink-conf.yaml`:
+
+{% highlight yaml %}
+fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
+{% endhighlight %}
+
+{% top %} 
diff --git a/docs/ops/filesystems/index.md b/docs/ops/filesystems/index.md
index 0d4a1be..eb4087d 100644
--- a/docs/ops/filesystems/index.md
+++ b/docs/ops/filesystems/index.md
@@ -25,7 +25,7 @@ under the License.
 -->
 
 Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
-These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
+These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
 
 The file system used for a particular file is determined by its URI scheme.
 For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
@@ -43,12 +43,17 @@ Flink ships with implementations for the following file systems:
 
   - **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint.
     
-  - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
+  - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.  
   
   - **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*. 
   The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
   To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
   When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+  
+  - **Azure Blob Storage**: 
+    Flink directly provides a file system to work with Azure Blob Storage. 
+    This filesystem is registered under the scheme *"wasb(s)://"*.
+    The implementation is self-contained with no dependency footprint.
 
 ## HDFS and Hadoop File System support 
 
diff --git a/docs/ops/filesystems/index.zh.md b/docs/ops/filesystems/index.zh.md
index 0d4a1be..414c82f 100644
--- a/docs/ops/filesystems/index.zh.md
+++ b/docs/ops/filesystems/index.zh.md
@@ -25,7 +25,7 @@ under the License.
 -->
 
 Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
-These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
+These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
 
 The file system used for a particular file is determined by its URI scheme.
 For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
@@ -49,6 +49,11 @@ Flink ships with implementations for the following file systems:
   The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
   To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
   When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+  
+  - **Azure Blob Storage**: 
+    Flink directly provides a file system to work with Azure Blob Storage. 
+    This filesystem is registered under the scheme *"wasb(s)://"*.
+    The implementation is self-contained with no dependency footprint.
 
 ## HDFS and Hadoop File System support 
 
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 1ce6e88..e28acd8 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -154,6 +154,13 @@
 			<fileMode>0644</fileMode>
 		</file>
 
+		<file>
+			<source>../flink-filesystems/flink-azure-fs-hadoop/target/flink-azure-fs-hadoop-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-azure-fs-hadoop-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
 		<!-- Queryable State -->
 		<file>
 			<source>../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar</source>
diff --git a/flink-end-to-end-tests/test-scripts/test_azure_fs.sh b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh
new file mode 100755
index 0000000..40c6962
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tests for Azure file system.
+
+# To run single test, export IT_CASE_AZURE_ACCOUNT, IT_CASE_AZURE_ACCESS_KEY, IT_CASE_AZURE_CONTAINER to
+# the appropriate values and run:
+# flink-end-to-end-tests/run-single-test.sh skip flink-end-to-end-tests/test-scripts/test_azure_fs.sh
+
+source "$(dirname "$0")"/common.sh
+
+if [[ -z "$IT_CASE_AZURE_ACCOUNT" ]]; then
+    echo "Did not find Azure storage account environment variable, NOT running the e2e test."
+    exit 0
+else
+    echo "Found Azure storage account $IT_CASE_AZURE_ACCOUNT, running the e2e test."
+fi
+
+if [[ -z "$IT_CASE_AZURE_ACCESS_KEY" ]]; then
+    echo "Did not find Azure storage access key environment variable, NOT running the e2e test."
+    exit 0
+else
+    echo "Found Azure storage access key $IT_CASE_AZURE_ACCESS_KEY, running the e2e test."
+fi
+
+if [[ -z "$IT_CASE_AZURE_CONTAINER" ]]; then
+    echo "Did not find Azure storage container environment variable, NOT running the e2e test."
+    exit 0
+else
+    echo "Found Azure storage container $IT_CASE_AZURE_CONTAINER, running the e2e test."
+fi
+
+AZURE_TEST_DATA_WORDS_URI="wasbs://$IT_CASE_AZURE_CONTAINER@$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net/words"
+
+###################################
+# Setup Flink Azure access.
+#
+# Globals:
+#   FLINK_DIR
+#   IT_CASE_AZURE_ACCOUNT
+#   IT_CASE_AZURE_ACCESS_KEY
+# Returns:
+#   None
+###################################
+function azure_setup {
+  # make sure we delete the file at the end
+  function azure_cleanup {
+    rm $FLINK_DIR/lib/flink-azure-fs*.jar
+
+    # remove any leftover settings
+    sed -i -e 's/fs.azure.account.key.*//' "$FLINK_DIR/conf/flink-conf.yaml"
+  }
+  trap azure_cleanup EXIT
+
+  echo "Copying flink azure jars and writing out configs"
+  cp $FLINK_DIR/opt/flink-azure-fs-hadoop-*.jar $FLINK_DIR/lib/
+  echo "fs.azure.account.key.$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net: $IT_CASE_AZURE_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+}
+
+azure_setup
+
+echo "Starting Flink cluster.."
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input $AZURE_TEST_DATA_WORDS_URI --output $TEST_DATA_DIR/out/wc_out
+
+check_result_hash "WordCountWithAzureFS" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
similarity index 57%
copy from flink-filesystems/flink-s3-fs-hadoop/pom.xml
copy to flink-filesystems/flink-azure-fs-hadoop/pom.xml
index 9a5a80c..37567ce 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-azure-fs-hadoop/pom.xml
@@ -27,14 +27,20 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-s3-fs-hadoop</artifactId>
-	<name>flink-s3-fs-hadoop</name>
+	<artifactId>flink-azure-fs-hadoop</artifactId>
+	<name>flink-azure-fs-hadoop</name>
 
 	<packaging>jar</packaging>
 
+	<!-- need to use a release which includes this patch: https://github.com/apache/hadoop/commit/02cadbd24bf69925078d044701741e2e3fcb4b2f -->
+	<properties>
+		<fs.azure.version>2.7.0</fs.azure.version>
+		<fs.azure.sdk.version>1.16.0</fs.azure.sdk.version>
+		<fs.jackson.core.version>2.9.4</fs.jackson.core.version>
+	</properties>
+
 	<dependencies>
 
-		<!-- Flink's file system abstraction (compiled against, not bundled) -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -42,42 +48,59 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-		<!-- S3 base -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-s3-fs-base</artifactId>
+			<artifactId>flink-hadoop-fs</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
-		<!-- for the behavior test suite -->
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-azure</artifactId>
+			<version>${fs.azure.version}</version>
 		</dependency>
 
-		<!-- for the settings that make unshaded tests run -->
+		<!-- for the Azure HDFS related tests -->
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-fs-hadoop-shaded</artifactId>
-			<version>${project.version}</version>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+
+		<!-- for Azure IT tests to check if HTTP endpoints are enabled / not -->
+		<dependency>
+			<groupId>com.microsoft.azure</groupId>
+			<artifactId>azure</artifactId>
+			<version>${fs.azure.sdk.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-core</artifactId>
+			<version>${fs.jackson.core.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
 			<scope>test</scope>
-			<type>test-jar</type>
 		</dependency>
 
+		<!-- for the behavior test suite -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-fs</artifactId>
+			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
 	</dependencies>
 
 	<build>
 		<plugins>
+			<!-- Relocate all Azure related classes -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-shade-plugin</artifactId>
@@ -96,22 +119,30 @@ under the License.
 								</includes>
 							</artifactSet>
 							<relocations>
-								<!-- relocate the references to Hadoop to match the shaded Hadoop config -->
 								<relocation>
 									<pattern>org.apache.hadoop</pattern>
-									<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
+									<shadedPattern>org.apache.flink.fs.shaded.hadoop.org.apache.hadoop</shadedPattern>
 								</relocation>
-								<!-- relocate the AWS dependencies -->
+								<!-- relocate the azure-storage dependencies -->
 								<relocation>
-									<pattern>com.amazon</pattern>
-									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
+									<pattern>com.microsoft.azure.storage</pattern>
+									<shadedPattern>org.apache.flink.fs.shaded.com.microsoft.azure.storage</shadedPattern>
 								</relocation>
 							</relocations>
 							<filters>
 								<filter>
 									<artifact>*</artifact>
 									<excludes>
-										<exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
+										<exclude>properties.dtd</exclude>
+										<exclude>PropertyList-1.0.dtd</exclude>
+										<exclude>mozilla/**</exclude>
+										<exclude>META-INF/maven/**</exclude>
+										<exclude>META-INF/LICENSE.txt</exclude>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+										<exclude>core-default.xml</exclude>
+										<exclude>hdfs-default.xml</exclude>
 									</excludes>
 								</filter>
 							</filters>
@@ -121,5 +152,4 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
-
 </project>
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
new file mode 100644
index 0000000..7ae9df8
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs). Based on Azure HDFS support in the
+ * <a href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class);
+
+	private static final String[] FLINK_CONFIG_PREFIXES = { "fs.azure.", "azure." };
+	private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";
+
+	private static final String[][] MIRRORED_CONFIG_KEYS = {};
+	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.emptySet();
+	private static final Set<String> CONFIG_KEYS_TO_SHADE = Collections.emptySet();
+	private static final String FLINK_SHADING_PREFIX = "";
+
+	private final HadoopConfigLoader configLoader;
+
+	private Configuration flinkConfig;
+
+	public AbstractAzureFSFactory() {
+		this.configLoader = new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+			HADOOP_CONFIG_PREFIX, PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		configLoader.setFlinkConfig(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "passed file system URI object should not be null");
+		LOG.info("Trying to load and instantiate Azure File System");
+		return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
+	}
+
+	// uri is of the form: wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDir
+	private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException {
+		org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+
+		org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
+		azureFS.initialize(fsUri, hadoopConfig);
+
+		return azureFS;
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java
new file mode 100644
index 0000000..5f6246d
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+/**
+ * A factory for the Azure file system over HTTP.
+ */
+public class AzureFSFactory extends AbstractAzureFSFactory {
+
+	@Override
+	public String getScheme() {
+		return "wasb";
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java
new file mode 100644
index 0000000..7130a87
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+/**
+ * A factory for the Azure file system over HTTPs.
+ */
+public class SecureAzureFSFactory extends AbstractAzureFSFactory {
+
+	@Override
+	public String getScheme() {
+		return "wasbs";
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..4d6a19a
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.fs.azurefs.AzureFSFactory
+org.apache.flink.fs.azurefs.SecureAzureFSFactory
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
new file mode 100644
index 0000000..01b79b5
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.azure.AzureException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests for the AzureFSFactory.
+ */
+@RunWith(Parameterized.class)
+public class AzureFSFactoryTest extends TestLogger {
+
+	@Parameterized.Parameter
+	public String scheme;
+
+	@Parameterized.Parameters(name = "Scheme = {0}")
+	public static List<String> parameters() {
+		return Arrays.asList("wasb", "wasbs");
+	}
+
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
+	private AbstractAzureFSFactory getFactory(String scheme) {
+		return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory();
+	}
+
+	@Test
+	public void testNullFsURI() throws Exception {
+		URI uri = null;
+		AbstractAzureFSFactory factory = getFactory(scheme);
+
+		exception.expect(NullPointerException.class);
+		exception.expectMessage("passed file system URI object should not be null");
+
+		factory.create(uri);
+	}
+
+	// missing credentials
+	@Test
+	public void testCreateFsWithAuthorityMissingCreds() throws Exception {
+		String uriString = String.format("%s://yourcontainer@youraccount.blob.core.windows.net/testDir", scheme);
+		final URI uri = URI.create(uriString);
+
+		exception.expect(AzureException.class);
+
+		AbstractAzureFSFactory factory = getFactory(scheme);
+		Configuration config = new Configuration();
+		config.setInteger("fs.azure.io.retry.max.retries", 0);
+		factory.configure(config);
+		factory.create(uri);
+	}
+
+	@Test
+	public void testCreateFsWithMissingAuthority() throws Exception {
+		String uriString = String.format("%s:///my/path", scheme);
+		final URI uri = URI.create(uriString);
+
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Cannot initialize WASB file system, URI authority not recognized.");
+
+		AbstractAzureFSFactory factory = getFactory(scheme);
+		factory.configure(new Configuration());
+		factory.create(uri);
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
new file mode 100644
index 0000000..6c65be9
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import com.microsoft.azure.credentials.ApplicationTokenCredentials;
+import com.microsoft.azure.credentials.AzureTokenCredentials;
+import com.microsoft.azure.management.Azure;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for Azure based
+ * file system.
+ */
+@RunWith(Parameterized.class)
+public class AzureFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+	@Parameterized.Parameter
+	public String scheme;
+
+	private static final String CONTAINER = System.getenv("ARTIFACTS_AZURE_CONTAINER");
+	private static final String ACCOUNT = System.getenv("ARTIFACTS_AZURE_STORAGE_ACCOUNT");
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AZURE_ACCESS_KEY");
+	private static final String RESOURCE_GROUP = System.getenv("ARTIFACTS_AZURE_RESOURCE_GROUP");
+	private static final String SUBSCRIPTION_ID = System.getenv("ARTIFACTS_AZURE_SUBSCRIPTION_ID");
+	private static final String TOKEN_CREDENTIALS_FILE = System.getenv("ARTIFACTS_AZURE_TOKEN_CREDENTIALS_FILE");
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	// Azure Blob Storage defaults to https only storage accounts. We check if http support has been
+	// enabled on a best effort basis and test http if so.
+	@Parameterized.Parameters(name = "Scheme = {0}")
+	public static List<String> parameters() throws IOException {
+		boolean httpsOnly = isHttpsTrafficOnly();
+		return httpsOnly ? Arrays.asList("wasbs") : Arrays.asList("wasb", "wasbs");
+	}
+
+	private static boolean isHttpsTrafficOnly() throws IOException {
+		if (StringUtils.isNullOrWhitespaceOnly(RESOURCE_GROUP) || StringUtils.isNullOrWhitespaceOnly(TOKEN_CREDENTIALS_FILE)) {
+			// default to https only, as some fields are missing
+			return true;
+		}
+
+		Assume.assumeTrue("Azure storage account not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCOUNT));
+
+		AzureTokenCredentials credentials = ApplicationTokenCredentials.fromFile(new File(TOKEN_CREDENTIALS_FILE));
+		Azure azure =
+			StringUtils.isNullOrWhitespaceOnly(SUBSCRIPTION_ID) ?
+				Azure.authenticate(credentials).withDefaultSubscription() :
+				Azure.authenticate(credentials).withSubscription(SUBSCRIPTION_ID);
+
+		return azure.storageAccounts().getByResourceGroup(RESOURCE_GROUP, ACCOUNT).inner().enableHttpsTrafficOnly();
+	}
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials and container details exist
+		Assume.assumeTrue("Azure container not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(CONTAINER));
+		Assume.assumeTrue("Azure access key not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		// fs.azure.account.key.youraccount.blob.core.windows.net = ACCESS_KEY
+		conf.setString("fs.azure.account.key." + ACCOUNT + ".blob.core.windows.net", ACCESS_KEY);
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void clearFsConfig() throws IOException {
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Override
+	public FileSystem getFileSystem() throws Exception {
+		return getBasePath().getFileSystem();
+	}
+
+	@Override
+	public Path getBasePath() {
+		// wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDataDir
+		String uriString = scheme + "://" + CONTAINER + '@' + ACCOUNT + ".blob.core.windows.net/" + TEST_DATA_DIR;
+		return new Path(uriString);
+	}
+
+	@Test
+	public void testSimpleFileWriteAndRead() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+
+		final String testLine = "Hello Upload!";
+
+		final Path path = new Path(getBasePath() + "/test.txt");
+		final FileSystem fs = path.getFileSystem();
+
+		try {
+			try (FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.OVERWRITE);
+				OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+				writer.write(testLine);
+			}
+
+			// just in case, wait for the path to exist
+			checkPathEventualExistence(fs, path, true, deadline);
+
+			try (FSDataInputStream in = fs.open(path);
+				InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
+				BufferedReader reader = new BufferedReader(ir)) {
+				String line = reader.readLine();
+				assertEquals(testLine, line);
+			}
+		}
+		finally {
+			fs.delete(path, false);
+		}
+
+		// now file must be gone
+		checkPathEventualExistence(fs, path, false, deadline);
+	}
+
+	@Test
+	public void testDirectoryListing() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+
+		final Path directory = new Path(getBasePath() + "/testdir/");
+		final FileSystem fs = directory.getFileSystem();
+
+		// directory must not yet exist
+		assertFalse(fs.exists(directory));
+
+		try {
+			// create directory
+			assertTrue(fs.mkdirs(directory));
+
+			checkPathEventualExistence(fs, directory, true, deadline);
+
+			// directory empty
+			assertEquals(0, fs.listStatus(directory).length);
+
+			// create some files
+			final int numFiles = 3;
+			for (int i = 0; i < numFiles; i++) {
+				Path file = new Path(directory, "/file-" + i);
+				try (FSDataOutputStream out = fs.create(file, FileSystem.WriteMode.OVERWRITE);
+					OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+					writer.write("hello-" + i + "\n");
+				}
+				// just in case, wait for the file to exist (should then also be reflected in the
+				// directory's file list below)
+				checkPathEventualExistence(fs, file, true, deadline);
+			}
+
+			FileStatus[] files = fs.listStatus(directory);
+			assertNotNull(files);
+			assertEquals(3, files.length);
+
+			for (FileStatus status : files) {
+				assertFalse(status.isDir());
+			}
+
+			// now that there are files, the directory must exist
+			assertTrue(fs.exists(directory));
+		}
+		finally {
+			// clean up
+			fs.delete(directory, true);
+		}
+
+		// now directory must be gone
+		checkPathEventualExistence(fs, directory, false, deadline);
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+}
diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 6a5976a..1135e01 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -224,8 +224,8 @@ public class HadoopFileSystem extends FileSystem {
 	static FileSystemKind getKindForScheme(String scheme) {
 		scheme = scheme.toLowerCase(Locale.US);
 
-		if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss")) {
-			// the Amazon S3 storage or Aliyun OSS storage
+		if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss") || scheme.startsWith("wasb")) {
+			// the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
 			return FileSystemKind.OBJECT_STORE;
 		}
 		else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
similarity index 99%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
rename to flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
index 1bbb757..aa8fdfe 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.fs.s3.common;
+package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.Configuration;
 
diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml
index 0b640a4..00d4086 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -166,7 +166,7 @@ under the License.
 								<filter>
 									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
 									<excludes>
-										<exclude>org/apache/flink/runtime/util/**</exclude>
+										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
 										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
 									</excludes>
 								</filter>
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index ff575be..a576a96 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 943de1d..ebf3b67 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index 9a5a80c..e7cf95e 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -106,6 +106,11 @@ under the License.
 									<pattern>com.amazon</pattern>
 									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
 								</relocation>
+								<!-- shade Flink's Hadoop FS utility classes -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.util</pattern>
+									<shadedPattern>org.apache.flink.fs.s3hadoop.common</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 2637e7b..6cad051 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3hadoop;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index 4471b38..57500f3 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3hadoop;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import org.junit.Test;
 
diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml
index 8f88bbf..3fc8e03 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -290,6 +290,12 @@ under the License.
 									<pattern>com.google</pattern>
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.google</shadedPattern>
 								</relocation>
+
+								<!-- shade Flink's Hadoop FS utility classes -->
+								<relocation>
+									<pattern>org.apache.flink.runtime.util</pattern>
+									<shadedPattern>org.apache.flink.fs.s3presto.common</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index c0c1beb..5a1ffee 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.s3.PrestoS3FileSystem;
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 093efc8..f3117a2 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
-import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml
index 8da2cea..c84e853 100644
--- a/flink-filesystems/pom.xml
+++ b/flink-filesystems/pom.xml
@@ -47,6 +47,7 @@ under the License.
 		<module>flink-s3-fs-presto</module>
 		<module>flink-swift-fs-hadoop</module>
 		<module>flink-oss-fs-hadoop</module>
+		<module>flink-azure-fs-hadoop</module>
 	</modules>
 
 	<!-- Common dependency setup for all filesystems -->