You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2020/10/21 15:33:02 UTC

[asterixdb] branch master updated: [ASTERIXDB-2780][EXT] Add support for Azure Blob Storage as External Dataset

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e86d900  [ASTERIXDB-2780][EXT] Add support for Azure Blob Storage as External Dataset
e86d900 is described below

commit e86d900e4bebb65ea819c4b8c282a23869c8be8b
Author: Hussain Towaileb <Hu...@Couchbase.com>
AuthorDate: Fri Oct 16 16:52:27 2020 +0300

    [ASTERIXDB-2780][EXT] Add support for Azure Blob Storage as External Dataset
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Added support for Azure Blob Storage as External Dataset.
    - Added test cases for the above mentioned item.
    - Refactored S3 and Azure Blob Storage test cases, now
      both external datasets are re-using the same test files.
    
    Change-Id: I93480bf25f534f212e93492cbb68db494ea20032
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7524
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 asterixdb/asterix-app/pom.xml                      |   5 +
 .../apache/asterix/test/common/TestConstants.java  |  16 +-
 .../apache/asterix/test/common/TestExecutor.java   |   9 +
 ...BlobStorageExternalDatasetOnePartitionTest.java |  59 ++
 .../AzureBlobStorageExternalDatasetTest.java       | 594 +++++++++++++++++++++
 .../asterix/test/runtime/LangExecutionUtil.java    |  15 +-
 ...stsuite_external_dataset_azure_blob_storage.xml | 292 ++++++++++
 ...al_dataset_azure_blob_storage_one_partition.xml |  99 ++++
 asterixdb/asterix-external-data/pom.xml            |   4 +
 .../abstracts/AbstractExternalInputStream.java     | 110 ++++
 .../AbstractExternalInputStreamFactory.java        | 175 ++++++
 .../input/record/reader/aws/AwsS3InputStream.java  |  78 +--
 .../record/reader/aws/AwsS3InputStreamFactory.java | 117 +---
 .../record/reader/azure/AzureBlobInputStream.java  |  89 +++
 .../reader/azure/AzureBlobInputStreamFactory.java  | 153 ++++++
 .../reader/azure/AzureBlobReaderFactory.java       |  73 +++
 .../external/util/ExternalDataConstants.java       |  22 +
 .../asterix/external/util/ExternalDataUtils.java   | 190 +++++--
 ...pache.asterix.external.api.IRecordReaderFactory |   1 +
 asterixdb/asterix-server/pom.xml                   |  10 +
 asterixdb/pom.xml                                  |  50 ++
 .../appended-resources/supplemental-models.xml     | 114 ++++
 ...com_Azure_azure-sdk-for-java_master_LICENSE.txt |  21 +
 ....com_Azure_azure-sdk-for-java_master_NOTICE.txt | 159 ++++++
 24 files changed, 2209 insertions(+), 246 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 4c5a882..7bf5127 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -855,6 +855,11 @@
       <artifactId>akka-http-core_2.12</artifactId>
       <scope>test</scope>
     </dependency>
+    <!-- Azure -->
+    <dependency>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-blob</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-1.2-api</artifactId>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 71468dc..ef2ec6c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -41,17 +41,13 @@ public class TestConstants {
     public static final String AZURE_ACCOUNT_KEY_PLACEHOLDER = "%accountKey%";
     public static final String AZURE_AZURITE_ACCOUNT_KEY_DEFAULT =
             "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsu" + "Fq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
-    public static final String AZURE_DEFAULT_ENDPOINTS_PROTOCOL_PLACEHOLDER = "%defaultEndpointsProtocol%";
-    public static final String AZURE_DEFAULT_ENDPOINTS_PROTOCOL_DEFAULT = "http";
     public static final String AZURE_BLOB_ENDPOINT_PLACEHOLDER = "%blobEndpoint%";
     public static final String AZURE_BLOB_ENDPOINT_DEFAULT =
             "http://localhost:20000/" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT;
-    public static final String AZURE_TEMPLATE = "(\"defaultEndpointsProtocol\"=\""
-            + AZURE_DEFAULT_ENDPOINTS_PROTOCOL_DEFAULT + "\"),\n" + "(\"accountName\"=\""
-            + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT
-            + "\"),\n" + "(\"blobEndpoint\"=\"" + AZURE_BLOB_ENDPOINT_PLACEHOLDER + "\")";
-    public static final String AZURE_TEMPLATE_DEFAULT = "(\"defaultEndpointsProtocol\"=\""
-            + AZURE_DEFAULT_ENDPOINTS_PROTOCOL_DEFAULT + "\"),\n" + "(\"accountName\"=\""
-            + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT
-            + "\"),\n" + "(\"blobEndpoint\"=\"" + AZURE_BLOB_ENDPOINT_DEFAULT + "\")";
+    public static final String AZURE_TEMPLATE = "(\"accountName\"=\"" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n"
+            + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT + "\"),\n" + "(\"blobEndpoint\"=\""
+            + AZURE_BLOB_ENDPOINT_PLACEHOLDER + "\")";
+    public static final String AZURE_TEMPLATE_DEFAULT = "(\"accountName\"=\"" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT
+            + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT + "\"),\n" + "(\"blobEndpoint\"=\""
+            + AZURE_BLOB_ENDPOINT_DEFAULT + "\")";
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index a21b718..f535f2c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -2058,6 +2058,11 @@ public class TestExecutor {
             if (placeholder.getName().equals("adapter")) {
                 str = str.replace("%adapter%", placeholder.getValue());
 
+                // Early terminate if there are no template place holders to replace
+                if (noTemplateRequired(str)) {
+                    return str;
+                }
+
                 if (placeholder.getValue().equalsIgnoreCase("S3")) {
                     return applyS3Substitution(str, placeholders);
                 } else if (placeholder.getValue().equalsIgnoreCase("AzureBlob")) {
@@ -2071,6 +2076,10 @@ public class TestExecutor {
         return str;
     }
 
+    protected boolean noTemplateRequired(String str) {
+        return !str.contains("%template%");
+    }
+
     protected String applyS3Substitution(String str, List<Placeholder> placeholders) {
         boolean isReplaced = false;
         boolean hasRegion = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetOnePartitionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetOnePartitionTest.java
new file mode 100644
index 0000000..9e66207
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetOnePartitionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.microsoft;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+
+@Ignore
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class AzureBlobStorageExternalDatasetOnePartitionTest extends AzureBlobStorageExternalDatasetTest {
+
+    public AzureBlobStorageExternalDatasetOnePartitionTest(TestCaseContext tcCtx) {
+        super(tcCtx);
+    }
+
+    @Parameterized.Parameters(name = "AwsS3ExternalDatasetOnePartitionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        SUITE_TESTS = "testsuite_external_dataset_azure_blob_storage_one_partition.xml";
+        ONLY_TESTS = "only_external_dataset.xml";
+        TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
+        PREPARE_PLAYGROUND_CONTAINER = AzureBlobStorageExternalDatasetOnePartitionTest::preparePlaygroundContainer;
+        PREPARE_FIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetOnePartitionTest::prepareFixedDataContainer;
+        PREPARE_MIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetOnePartitionTest::prepareMixedDataContainer;
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    private static void preparePlaygroundContainer() {
+    }
+
+    private static void prepareFixedDataContainer() {
+    }
+
+    private static void prepareMixedDataContainer() {
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
new file mode 100644
index 0000000..da9f912
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.microsoft;
+
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.context.TestFileContext;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+
+@Ignore
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class AzureBlobStorageExternalDatasetTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    // subclasses of this class MUST instantiate these variables before using them to avoid unexpected behavior
+    static String SUITE_TESTS;
+    static String ONLY_TESTS;
+    static String TEST_CONFIG_FILE_NAME;
+    static Runnable PREPARE_PLAYGROUND_CONTAINER;
+    static Runnable PREPARE_FIXED_DATA_CONTAINER;
+    static Runnable PREPARE_MIXED_DATA_CONTAINER;
+
+    // Base directory paths for data files
+    private static final String JSON_DATA_PATH = joinPath("data", "json");
+    private static final String CSV_DATA_PATH = joinPath("data", "csv");
+    private static final String TSV_DATA_PATH = joinPath("data", "tsv");
+    private static final String MIXED_DATA_PATH = joinPath("data", "mixed");
+
+    // Service endpoint
+    private static final int BLOB_SERVICE_PORT = 20000;
+    private static final String BLOB_SERVICE_ENDPOINT = "http://localhost:" + BLOB_SERVICE_PORT;
+
+    // Region, container and definitions
+    private static final String PLAYGROUND_CONTAINER = "playground";
+    private static final String FIXED_DATA_CONTAINER = "fixed-data"; // Do not use, has fixed data
+    private static final String INCLUDE_EXCLUDE_CONTAINER = "include-exclude";
+    private static final String JSON_DEFINITION = "json-data/reviews/";
+    private static final String CSV_DEFINITION = "csv-data/reviews/";
+    private static final String TSV_DEFINITION = "tsv-data/reviews/";
+
+    // This is used for a test to generate over 1000 number of files
+    private static final String OVER_1000_OBJECTS_PATH = "over-1000-objects";
+    private static final int OVER_1000_OBJECTS_COUNT = 2999;
+
+    private static final Set<String> fileNames = new HashSet<>();
+
+    // Create a BlobServiceClient object which will be used to create a container client
+    private static final String connectionString = "AccountName=devstoreaccount1;"
+            + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
+            + "BlobEndpoint=" + BLOB_SERVICE_ENDPOINT + "/devstoreaccount1;";
+    private static BlobServiceClient blobServiceClient;
+    private static BlobContainerClient playgroundContainer;
+
+    protected TestCaseContext tcCtx;
+
+    public AzureBlobStorageExternalDatasetTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        final TestExecutor testExecutor = new AzureTestExecutor();
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        setNcEndpoints(testExecutor);
+        createBlobServiceClient();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "AzureBlobStorageExternalDatasetTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        SUITE_TESTS = "testsuite_external_dataset_azure_blob_storage.xml";
+        ONLY_TESTS = "only_external_dataset.xml";
+        TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+        PREPARE_PLAYGROUND_CONTAINER = AzureBlobStorageExternalDatasetTest::preparePlaygroundContainer;
+        PREPARE_FIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetTest::prepareFixedDataContainer;
+        PREPARE_MIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetTest::prepareMixedDataContainer;
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static void setNcEndpoints(TestExecutor testExecutor) {
+        final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+        final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+        final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+        for (NodeControllerService nc : ncs) {
+            final String nodeId = nc.getId();
+            final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+            int apiPort = appCtx.getExternalProperties().getNcApiPort();
+            ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+        }
+        testExecutor.setNcEndPoints(ncEndPoints);
+    }
+
+    private static void createBlobServiceClient() {
+        LOGGER.info("Creating Azurite Blob Service client");
+        blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString).buildClient();
+        LOGGER.info("Azurite Blob Service client created successfully");
+
+        // Create the container and upload some json files
+        PREPARE_PLAYGROUND_CONTAINER.run();
+        PREPARE_FIXED_DATA_CONTAINER.run();
+        PREPARE_MIXED_DATA_CONTAINER.run();
+    }
+
+    /**
+     * Creates a container and fills it with some files for testing purpose.
+     */
+    private static void preparePlaygroundContainer() {
+        deleteContainerSilently(PLAYGROUND_CONTAINER);
+
+        LOGGER.info("creating container " + PLAYGROUND_CONTAINER);
+        playgroundContainer = blobServiceClient.createBlobContainer(PLAYGROUND_CONTAINER);
+        LOGGER.info("container " + PLAYGROUND_CONTAINER + " created successfully");
+
+        LOGGER.info("Adding JSON files");
+        loadJsonFiles();
+        LOGGER.info("JSON Files added successfully");
+
+        LOGGER.info("Adding CSV files");
+        loadCsvFiles();
+        LOGGER.info("CSV Files added successfully");
+
+        LOGGER.info("Adding TSV files");
+        loadTsvFiles();
+        LOGGER.info("TSV Files added successfully");
+
+        LOGGER.info("Loading " + OVER_1000_OBJECTS_COUNT + " into " + OVER_1000_OBJECTS_PATH);
+        loadLargeNumberOfFiles();
+        LOGGER.info("Added " + OVER_1000_OBJECTS_COUNT + " files into " + OVER_1000_OBJECTS_PATH + " successfully");
+    }
+
+    /**
+     * This container is being filled by fixed data, a test is counting all records. If this container is
+     * changed, the test case will fail and its result will need to be updated each time
+     */
+    private static void prepareFixedDataContainer() {
+        deleteContainerSilently(FIXED_DATA_CONTAINER);
+        LOGGER.info("creating container " + FIXED_DATA_CONTAINER);
+        BlobContainerClient fixedDataContainer = blobServiceClient.createBlobContainer(FIXED_DATA_CONTAINER);
+        LOGGER.info("container " + FIXED_DATA_CONTAINER + " created successfully");
+
+        LOGGER.info("Loading fixed data to " + FIXED_DATA_CONTAINER);
+
+        // Files data
+        Path filePath = Paths.get(JSON_DATA_PATH, "single-line", "20-records.json");
+        fixedDataContainer.getBlobClient("1.json").uploadFromFile(filePath.toString());
+        fixedDataContainer.getBlobClient("2.json").uploadFromFile(filePath.toString());
+        fixedDataContainer.getBlobClient("lvl1/3.json").uploadFromFile(filePath.toString());
+        fixedDataContainer.getBlobClient("lvl1/4.json").uploadFromFile(filePath.toString());
+        fixedDataContainer.getBlobClient("lvl1/lvl2/5.json").uploadFromFile(filePath.toString());
+    }
+
+    private static void loadJsonFiles() {
+        String dataBasePath = JSON_DATA_PATH;
+        String definition = JSON_DEFINITION;
+
+        // Normal format
+        String definitionSegment = "json";
+        loadData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+        loadData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+        loadData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+        loadData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+                false);
+
+        definitionSegment = "json-array-of-objects";
+        loadData(dataBasePath, "single-line", "array_of_objects.json", "json-data/", definitionSegment, false, false);
+
+        // gz compressed format
+        definitionSegment = "gz";
+        loadGzData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+                false);
+
+        // Mixed normal and gz compressed format
+        definitionSegment = "mixed";
+        loadData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+        loadData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+        loadData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+        loadData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+                false);
+        loadGzData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+                false);
+    }
+
+    private static void loadCsvFiles() {
+        String dataBasePath = CSV_DATA_PATH;
+        String definition = CSV_DEFINITION;
+
+        // Normal format
+        String definitionSegment = "csv";
+        loadData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+        loadData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+
+        // gz compressed format
+        definitionSegment = "gz";
+        loadGzData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+
+        // Mixed normal and gz compressed format
+        definitionSegment = "mixed";
+        loadData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+        loadData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+    }
+
+    private static void loadTsvFiles() {
+        String dataBasePath = TSV_DATA_PATH;
+        String definition = TSV_DEFINITION;
+
+        // Normal format
+        String definitionSegment = "tsv";
+        loadData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+        loadData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+
+        // gz compressed format
+        definitionSegment = "gz";
+        loadGzData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+
+        // Mixed normal and gz compressed format
+        definitionSegment = "mixed";
+        loadData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+        loadData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+        loadGzData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+    }
+
+    private static void loadData(String fileBasePath, String filePathSegment, String filename, String definition,
+            String definitionSegment, boolean removeExtension) {
+        loadData(fileBasePath, filePathSegment, filename, definition, definitionSegment, removeExtension, true);
+    }
+
+    private static void loadData(String fileBasePath, String filePathSegment, String filename, String definition,
+            String definitionSegment, boolean removeExtension, boolean copyToSubLevels) {
+        // Files data
+        Path filePath = Paths.get(fileBasePath, filePathSegment, filename);
+
+        // Keep or remove the file extension
+        Assert.assertFalse("Files with no extension are not supported yet for external datasets", removeExtension);
+        String finalFileName;
+        if (removeExtension) {
+            finalFileName = FilenameUtils.removeExtension(filename);
+        } else {
+            finalFileName = filename;
+        }
+
+        // Files base definition
+        filePathSegment = filePathSegment.isEmpty() ? "" : filePathSegment + "/";
+        definitionSegment = definitionSegment.isEmpty() ? "" : definitionSegment + "/";
+        String basePath = definition + filePathSegment + definitionSegment;
+
+        // Load the data
+        playgroundContainer.getBlobClient(basePath + finalFileName).uploadFromFile(filePath.toString());
+        if (copyToSubLevels) {
+            playgroundContainer.getBlobClient(basePath + "level1a/" + finalFileName)
+                    .uploadFromFile(filePath.toString());
+            playgroundContainer.getBlobClient(basePath + "level1b/" + finalFileName)
+                    .uploadFromFile(filePath.toString());
+            playgroundContainer.getBlobClient(basePath + "level1a/level2a/" + finalFileName)
+                    .uploadFromFile(filePath.toString());
+            playgroundContainer.getBlobClient(basePath + "level1a/level2b/" + finalFileName)
+                    .uploadFromFile(filePath.toString());
+        }
+    }
+
+    private static void loadGzData(String fileBasePath, String filePathSegment, String filename, String definition,
+            String definitionSegment, boolean removeExtension) {
+        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+                GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
+
+            // Files data
+            Path filePath = Paths.get(fileBasePath, filePathSegment, filename);
+
+            // Get the compressed data
+            gzipOutputStream.write(Files.readAllBytes(filePath));
+            gzipOutputStream.close(); // Need to close or data will be invalid
+            byte[] gzipBytes = byteArrayOutputStream.toByteArray();
+
+            // Keep or remove the file extension
+            Assert.assertFalse("Files with no extension are not supported yet for external datasets", removeExtension);
+            String finalFileName;
+            if (removeExtension) {
+                finalFileName = FilenameUtils.removeExtension(filename);
+            } else {
+                finalFileName = filename;
+            }
+            finalFileName += ".gz";
+
+            // Files base definition
+            filePathSegment = filePathSegment.isEmpty() ? "" : filePathSegment + "/";
+            definitionSegment = definitionSegment.isEmpty() ? "" : definitionSegment + "/";
+            String basePath = definition + filePathSegment + definitionSegment;
+
+            // Load the data
+            ByteArrayInputStream inputStream = new ByteArrayInputStream(gzipBytes);
+            playgroundContainer.getBlobClient(basePath + finalFileName).upload(inputStream, inputStream.available());
+            inputStream.reset();
+            playgroundContainer.getBlobClient(basePath + "level1a/" + finalFileName).upload(inputStream,
+                    inputStream.available());
+            inputStream.reset();
+            playgroundContainer.getBlobClient(basePath + "level1b/" + finalFileName).upload(inputStream,
+                    inputStream.available());
+            inputStream.reset();
+            playgroundContainer.getBlobClient(basePath + "level1a/level2a/" + finalFileName).upload(inputStream,
+                    inputStream.available());
+            inputStream.reset();
+            playgroundContainer.getBlobClient(basePath + "level1a/level2b/" + finalFileName).upload(inputStream,
+                    inputStream.available());
+            closeInputStreamSilently(inputStream);
+        } catch (Exception ex) {
+            LOGGER.error(ex.getMessage());
+        }
+    }
+
+    private static void loadLargeNumberOfFiles() {
+        ByteArrayInputStream inputStream = null;
+        for (int i = 0; i < OVER_1000_OBJECTS_COUNT; i++) {
+            inputStream = new ByteArrayInputStream(("{\"id\":" + i + "}").getBytes());
+            playgroundContainer.getBlobClient(OVER_1000_OBJECTS_PATH + "/" + i + ".json").upload(inputStream,
+                    inputStream.available());
+        }
+        closeInputStreamSilently(inputStream);
+    }
+
+    /**
+     * Loads a combination of different file formats in the same path
+     */
+    private static void prepareMixedDataContainer() {
+        deleteContainerSilently(INCLUDE_EXCLUDE_CONTAINER);
+        LOGGER.info("creating container " + INCLUDE_EXCLUDE_CONTAINER);
+        BlobContainerClient includeExcludeContainer = blobServiceClient.createBlobContainer(INCLUDE_EXCLUDE_CONTAINER);
+        LOGGER.info("container " + INCLUDE_EXCLUDE_CONTAINER + " created successfully");
+
+        // JSON
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(("{\"id\":" + 1 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/extension/" + "hello-world-2018.json")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 2 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/extension/" + "hello-world-2019.json")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 3 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/extension/" + "hello-world-2020.json")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 4 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/EXTENSION/" + "goodbye-world-2018.json")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 5 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/EXTENSION/" + "goodbye-world-2019.json")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 6 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/EXTENSION/" + "goodbye-world-2020.json")
+                .upload(inputStream, inputStream.available());
+
+        // CSV
+        inputStream = new ByteArrayInputStream(("7,\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/extension/" + "hello-world-2018.csv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("8,\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/extension/" + "hello-world-2019.csv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("9,\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/extension/" + "hello-world-2020.csv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("10,\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/EXTENSION/" + "goodbye-world-2018.csv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("11,\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/EXTENSION/" + "goodbye-world-2019.csv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("12,\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/EXTENSION/" + "goodbye-world-2020.csv")
+                .upload(inputStream, inputStream.available());
+
+        // TSV
+        inputStream = new ByteArrayInputStream(("13\t\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/extension/" + "hello-world-2018.tsv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("14\t\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/extension/" + "hello-world-2019.tsv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("15\t\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/extension/" + "hello-world-2020.tsv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("16\t\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/EXTENSION/" + "goodbye-world-2018.tsv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("17\t\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/EXTENSION/" + "goodbye-world-2019.tsv")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("18\t\"good\"").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/EXTENSION/" + "goodbye-world-2020.tsv")
+                .upload(inputStream, inputStream.available());
+
+        // JSON no extension
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 1 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/no-extension/" + "hello-world-2018")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 2 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/no-extension/" + "hello-world-2019")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 3 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/no-extension/" + "hello-world-2020")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 4 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/NO-EXTENSION/" + "goodbye-world-2018")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 5 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/NO-EXTENSION/" + "goodbye-world-2019")
+                .upload(inputStream, inputStream.available());
+        inputStream = new ByteArrayInputStream(("{\"id\":" + 6 + "}").getBytes());
+        includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/NO-EXTENSION/" + "goodbye-world-2020")
+                .upload(inputStream, inputStream.available());
+
+        closeInputStreamSilently(inputStream);
+    }
+
+    static class AzureTestExecutor extends TestExecutor {
+
+        public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+                String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
+                MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
+                BitSet expectedWarnings) throws Exception {
+            String[] lines;
+            switch (ctx.getType()) {
+                case "container":
+                    // <container> <def> <sub-path:new_fname:src_file1,sub-path:new_fname:src_file2,sub-path:src_file3>
+                    lines = TestExecutor.stripAllComments(statement).trim().split("\n");
+                    String lastLine = lines[lines.length - 1];
+                    String[] command = lastLine.trim().split(" ");
+                    int length = command.length;
+                    if (length != 3) {
+                        throw new Exception("invalid create container format");
+                    }
+                    dropRecreateContainer(command[0], command[1], command[2]);
+                    break;
+                default:
+                    super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
+                            queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
+            }
+        }
+    }
+
+    private static void dropRecreateContainer(String containerName, String definition, String files) {
+        String definitionPath = definition + (definition.endsWith("/") ? "" : "/");
+        String[] fileSplits = files.split(",");
+
+        LOGGER.info("Deleting container " + containerName);
+        try {
+            blobServiceClient.deleteBlobContainer(containerName);
+        } catch (Exception ex) {
+            // Ignore
+        }
+        LOGGER.info("Container " + containerName + " deleted successfully");
+
+        BlobContainerClient containerClient;
+        LOGGER.info("Creating container " + containerName);
+        containerClient = blobServiceClient.createBlobContainer(containerName);
+        LOGGER.info("Uploading to container " + containerName + " definition " + definitionPath);
+        fileNames.clear();
+        for (String fileSplit : fileSplits) {
+            String[] pathAndSourceFile = fileSplit.split(":");
+            int size = pathAndSourceFile.length;
+            String path;
+            String sourceFilePath;
+            String uploadedFileName;
+            if (size == 1) {
+                // case: playground json-data/reviews SOURCE_FILE1,SOURCE_FILE2
+                path = definitionPath;
+                sourceFilePath = pathAndSourceFile[0];
+                uploadedFileName = FilenameUtils.getName(pathAndSourceFile[0]);
+            } else if (size == 2) {
+                // case: playground json-data/reviews level1/sub-level:SOURCE_FILE1,level2/sub-level:SOURCE_FILE2
+                String subPathOrNewFileName = pathAndSourceFile[0];
+                if (subPathOrNewFileName.startsWith("$$")) {
+                    path = definitionPath;
+                    sourceFilePath = pathAndSourceFile[1];
+                    uploadedFileName = subPathOrNewFileName.substring(2);
+                } else {
+                    path = definitionPath + subPathOrNewFileName + (subPathOrNewFileName.endsWith("/") ? "" : "/");
+                    sourceFilePath = pathAndSourceFile[1];
+                    uploadedFileName = FilenameUtils.getName(pathAndSourceFile[1]);
+                }
+            } else if (size == 3) {
+                path = definitionPath + pathAndSourceFile[0] + (pathAndSourceFile[0].endsWith("/") ? "" : "/");
+                uploadedFileName = pathAndSourceFile[1];
+                sourceFilePath = pathAndSourceFile[2];
+
+            } else {
+                throw new IllegalArgumentException();
+            }
+
+            String keyPath = path + uploadedFileName;
+            int k = 1;
+            while (fileNames.contains(keyPath)) {
+                keyPath = path + (k++) + uploadedFileName;
+            }
+            fileNames.add(keyPath);
+            containerClient.getBlobClient(keyPath).uploadFromFile(sourceFilePath);
+        }
+        LOGGER.info("Done creating container with data");
+    }
+
+    private static void deleteContainerSilently(String containerName) {
+        LOGGER.info("Deleting container " + containerName);
+        try {
+            blobServiceClient.deleteBlobContainer(containerName);
+        } catch (Exception ex) {
+            // Do nothing
+        }
+        LOGGER.info("Container " + containerName + " deleted successfully");
+    }
+
+    private static void closeInputStreamSilently(InputStream inputStream) {
+        try {
+            inputStream.close();
+        } catch (Exception ex) {
+            LOGGER.error(ex.getMessage());
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 20420ab..408882d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -95,17 +95,26 @@ public class LangExecutionUtil {
     }
 
     public static Collection<Object[]> tests(String onlyFilePath, String suiteFilePath) throws Exception {
-        Collection<Object[]> testArgs = buildTestsInXml(onlyFilePath);
+        return tests(onlyFilePath, suiteFilePath, null);
+    }
+
+    public static Collection<Object[]> tests(String onlyFilePath, String suiteFilePath, String pathBase)
+            throws Exception {
+        Collection<Object[]> testArgs = buildTestsInXml(onlyFilePath, pathBase);
         if (testArgs.size() == 0) {
-            testArgs = buildTestsInXml(suiteFilePath);
+            testArgs = buildTestsInXml(suiteFilePath, pathBase);
         }
         return testArgs;
     }
 
     protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+        return buildTestsInXml(xmlfile, null);
+    }
+
+    protected static Collection<Object[]> buildTestsInXml(String xmlfile, String pathBase) throws Exception {
         Collection<Object[]> testArgs = new ArrayList<>();
         TestCaseContext.Builder b = new TestCaseContext.Builder();
-        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+        for (TestCaseContext ctx : b.build(new File(pathBase == null ? PATH_BASE : pathBase), xmlfile)) {
             testArgs.add(new Object[] { ctx });
         }
         return testArgs;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
new file mode 100644
index 0000000..a715e7e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
@@ -0,0 +1,292 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+  <test-group name="external-dataset">
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/json/json">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/json/json</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/json/gz">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/json/gz</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/json/mixed">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/json/mixed</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/csv/csv">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/csv/csv</output-dir>
+      </compilation-unit>
+    </test-case><test-case FilePath="external-dataset">
+    <compilation-unit name="common/csv/gz">
+      <placeholder name="adapter" value="AZUREBLOB" />
+      <output-dir compare="Text">common/csv/gz</output-dir>
+    </compilation-unit>
+  </test-case><test-case FilePath="external-dataset">
+    <compilation-unit name="common/csv/mixed">
+      <placeholder name="adapter" value="AZUREBLOB" />
+      <output-dir compare="Text">common/csv/mixed</output-dir>
+    </compilation-unit>
+  </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/tsv/tsv">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/tsv/tsv</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/tsv/gz">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/tsv/gz</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/tsv/mixed">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/tsv/mixed</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/empty-string-definition">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/empty-string-definition</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/over-1000-objects">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/over-1000-objects</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/malformed-json">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/malformed-json</output-dir>
+        <expected-error>Parsing error at malformed-data/duplicate-fields.json line 1 field field: Duplicate field 'field'</expected-error>
+        <expected-error>Parsing error at malformed-data/malformed-json.json line 1 field field: Unexpected character ('}' (code 125)): was expecting double-quote to start field name</expected-error>
+        <expected-error>Parsing error at malformed-data/malformed-json-2.json line 4 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+        <expected-error>Parsing error at malformed-data/malformed-jsonl-1.json line 3 field field2: Unrecognized token 'truee': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+        <expected-error>Parsing error at malformed-data/malformed-jsonl-2.json line 11 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/definition-does-not-exist">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/definition-does-not-exist</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/invalid-endpoint">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <placeholder name="blobEndpoint" value="http://^invalid-endpoint^" />
+        <output-dir compare="Text">common/invalid-endpoint</output-dir>
+        <expected-error>External source error. java.net.URISyntaxException: Illegal character in authority at index 7: http://^invalid-endpoint^</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/bucket-does-not-exist">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/bucket-does-not-exist</output-dir>
+        <expected-error>External source error. Status code 404</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/no-files-returned/definition-points-to-nothing">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/no-files-returned/definition-points-to-nothing</output-dir>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/no-files-returned/exclude-all-files">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/no-files-returned/exclude-all-files</output-dir>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/no-files-returned/include-no-files">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/no-files-returned/include-no-files</output-dir>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+  <test-group name="include-exclude">
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/bad-name-1">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/bad-name-1</output-dir>
+        <expected-error>Invalid format for property "exclude1"</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/bad-name-2">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/bad-name-2</output-dir>
+        <expected-error>Invalid format for property "exclude#"</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/bad-name-3">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/bad-name-3</output-dir>
+        <expected-error>Invalid format for property "exclude#hello"</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/both">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/both</output-dir>
+        <expected-error>The parameters "include" and "exclude" cannot be provided at the same time</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-all">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-all</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-1">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-1</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-2">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-2</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-3">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-3</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-4">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-4</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-5">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-5</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/exclude-6">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/exclude-6</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-all">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-all</output-dir>
+        <expected-error>Malformed input stream</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-1">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-1</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-2">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-2</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-3">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-3</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-4">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-4</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-5">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-5</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-6">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-6</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-7">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-7</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-8">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-8</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-9">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-9</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-10">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-10</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-11">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-11</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/include-exclude/include-12">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/include-exclude/include-12</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage_one_partition.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage_one_partition.xml
new file mode 100644
index 0000000..32a56b2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage_one_partition.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+  <test-group name="external-dataset">
+    <test-case FilePath="external-dataset"  check-warnings="true">
+      <compilation-unit name="common/csv-header">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/csv-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/csv-no-header">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/csv-no-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/tsv-header">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/tsv-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/tsv-no-header">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/tsv-no-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/csv-warnings">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/csv-warnings</output-dir>
+        <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv line 2 field 3: some fields are missing</expected-warn>
+        <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv line 2 field 0: malformed input record ended abruptly</expected-warn>
+        <expected-warn>Parsing error at  line 2 field 0: malformed input record ended abruptly</expected-warn>
+
+        <expected-warn>Parsing error at  line 5 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 2 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 11 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 3 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 4 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 7 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 13 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 12 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 9 field 6: a quote should be in the beginning</expected-warn>
+
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 5 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 2 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 11 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 3 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 4 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 7 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 13 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 12 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 9 field 6: a quote should be in the beginning</expected-warn>
+
+        <expected-warn>Parsing error at data_dir/error1_line_num.csv line 3 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
+        <expected-warn>Parsing error at data_dir/error2_line_num.csv line 4 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/tsv-warnings">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/tsv-warnings</output-dir>
+        <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv line 2 field 3: some fields are missing</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/json-warnings">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/json-warnings</output-dir>
+        <expected-warn>Parsing error at data_dir/1.json line 3 field 0: malformed input record ended abruptly</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/jsonl">
+        <placeholder name="adapter" value="AZUREBLOB" />
+        <output-dir compare="Text">common/jsonl</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 4fed101..37c8a61 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -461,6 +461,10 @@
       <artifactId>auth</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-blob</artifactId>
+    </dependency>
+    <dependency>
         <groupId>org.msgpack</groupId>
         <artifactId>msgpack-core</artifactId>
     </dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
new file mode 100644
index 0000000..898f828
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.abstracts;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractExternalInputStream extends AbstractMultipleInputStream {
+
+    protected static final Logger LOGGER = LogManager.getLogger();
+
+    // Configuration
+    protected final Map<String, String> configuration;
+
+    // File fields
+    protected final List<String> filePaths;
+    protected int nextFileIndex = 0;
+
+    public AbstractExternalInputStream(Map<String, String> configuration, List<String> filePaths) {
+        this.configuration = configuration;
+        this.filePaths = filePaths;
+    }
+
+    @Override
+    protected boolean advance() throws IOException {
+        // No files to read for this partition
+        if (filePaths == null || filePaths.isEmpty()) {
+            return false;
+        }
+
+        // Finished reading all the files
+        if (nextFileIndex >= filePaths.size()) {
+            if (in != null) {
+                CleanupUtils.close(in, null);
+            }
+            return false;
+        }
+
+        // Close the current stream before going to the next one
+        if (in != null) {
+            CleanupUtils.close(in, null);
+        }
+
+        boolean isAvailableStream = getInputStream();
+        nextFileIndex++; // Always point to next file after getting the current stream
+        if (!isAvailableStream) {
+            return advance();
+        }
+
+        if (notificationHandler != null) {
+            notificationHandler.notifyNewSource();
+        }
+        return true;
+    }
+
+    protected abstract boolean getInputStream() throws IOException;
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            CleanupUtils.close(in, null);
+        }
+    }
+
+    @Override
+    public String getStreamName() {
+        return getStreamNameAt(nextFileIndex - 1);
+    }
+
+    @Override
+    public String getPreviousStreamName() {
+        return getStreamNameAt(nextFileIndex - 2);
+    }
+
+    private String getStreamNameAt(int fileIndex) {
+        return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
new file mode 100644
index 0000000..ca55b6f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.abstracts;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.*;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public abstract class AbstractExternalInputStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected Map<String, String> configuration;
+    protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+    protected transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public abstract AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return partitionConstraint;
+    }
+
+    @Override
+    public abstract void configure(IServiceContext ctx, Map<String, String> configuration,
+            IWarningCollector warningCollector) throws AlgebricksException;
+
+    /**
+     * Finds the smallest workload and returns it
+     *
+     * @return the smallest workload
+     */
+    protected PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
+        PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
+        for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
+            // If the current total size is 0, add the file directly as this is a first time partition
+            if (partition.getTotalSize() == 0) {
+                smallest = partition;
+                break;
+            }
+            if (partition.getTotalSize() < smallest.getTotalSize()) {
+                smallest = partition;
+            }
+        }
+
+        return smallest;
+    }
+
+    protected IncludeExcludeMatcher getIncludeExcludeMatchers() throws CompilationException {
+        // Get and compile the patterns for include/exclude if provided
+        List<Matcher> includeMatchers = new ArrayList<>();
+        List<Matcher> excludeMatchers = new ArrayList<>();
+        String pattern = null;
+        try {
+            for (Map.Entry<String, String> entry : configuration.entrySet()) {
+                if (entry.getKey().startsWith(KEY_INCLUDE)) {
+                    pattern = entry.getValue();
+                    includeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
+                } else if (entry.getKey().startsWith(KEY_EXCLUDE)) {
+                    pattern = entry.getValue();
+                    excludeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
+                }
+            }
+        } catch (PatternSyntaxException ex) {
+            throw new CompilationException(ErrorCode.INVALID_REGEX_PATTERN, pattern);
+        }
+
+        IncludeExcludeMatcher includeExcludeMatcher;
+        if (!includeMatchers.isEmpty()) {
+            includeExcludeMatcher = new IncludeExcludeMatcher(includeMatchers,
+                    (matchers1, key) -> ExternalDataUtils.matchPatterns(matchers1, key));
+        } else if (!excludeMatchers.isEmpty()) {
+            includeExcludeMatcher = new IncludeExcludeMatcher(excludeMatchers,
+                    (matchers1, key) -> !ExternalDataUtils.matchPatterns(matchers1, key));
+        } else {
+            includeExcludeMatcher = new IncludeExcludeMatcher(Collections.emptyList(), (matchers1, key) -> true);
+        }
+
+        return includeExcludeMatcher;
+    }
+
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final List<String> filePaths = new ArrayList<>();
+        private long totalSize = 0;
+
+        public PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<String> getFilePaths() {
+            return filePaths;
+        }
+
+        public void addFilePath(String filePath, long size) {
+            this.filePaths.add(filePath);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "Files: " + filePaths.size() + ", Total Size: " + totalSize;
+        }
+    }
+
+    public static class IncludeExcludeMatcher implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final List<Matcher> matchersList;
+        private final BiPredicate<List<Matcher>, String> predicate;
+
+        public IncludeExcludeMatcher(List<Matcher> matchersList, BiPredicate<List<Matcher>, String> predicate) {
+            this.matchersList = matchersList;
+            this.predicate = predicate;
+        }
+
+        public List<Matcher> getMatchersList() {
+            return matchersList;
+        }
+
+        public BiPredicate<List<Matcher>, String> getPredicate() {
+            return predicate;
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 9e10e6a..e3e53d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -28,59 +28,28 @@ import java.util.zip.GZIPInputStream;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.util.LogRedactionUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 
-public class AwsS3InputStream extends AbstractMultipleInputStream {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    // Configuration
-    private final Map<String, String> configuration;
+public class AwsS3InputStream extends AbstractExternalInputStream {
 
     private final S3Client s3Client;
 
-    // File fields
-    private final List<String> filePaths;
-    private int nextFileIndex = 0;
-
     public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
-        this.configuration = configuration;
-        this.filePaths = filePaths;
+        super(configuration, filePaths);
         this.s3Client = buildAwsS3Client(configuration);
     }
 
     @Override
-    protected boolean advance() throws IOException {
-        // No files to read for this partition
-        if (filePaths == null || filePaths.isEmpty()) {
-            return false;
-        }
-
-        // Finished reading all the files
-        if (nextFileIndex >= filePaths.size()) {
-            if (in != null) {
-                CleanupUtils.close(in, null);
-            }
-            return false;
-        }
-
-        // Close the current stream before going to the next one
-        if (in != null) {
-            CleanupUtils.close(in, null);
-        }
-
+    protected boolean getInputStream() throws IOException {
         String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
         GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
         GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
@@ -92,8 +61,7 @@ public class AwsS3InputStream extends AbstractMultipleInputStream {
         } catch (NoSuchKeyException ex) {
             LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
                     + getObjectRequest.bucket());
-            nextFileIndex++;
-            return advance();
+            return false;
         } catch (SdkException ex) {
             throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
         }
@@ -104,11 +72,6 @@ public class AwsS3InputStream extends AbstractMultipleInputStream {
             in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
         }
 
-        // Current file ready, point to the next file
-        nextFileIndex++;
-        if (notificationHandler != null) {
-            notificationHandler.notifyNewSource();
-        }
         return true;
     }
 
@@ -119,35 +82,4 @@ public class AwsS3InputStream extends AbstractMultipleInputStream {
             throw HyracksDataException.create(ex);
         }
     }
-
-    @Override
-    public boolean stop() {
-        return false;
-    }
-
-    @Override
-    public boolean handleException(Throwable th) {
-        return false;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (in != null) {
-            CleanupUtils.close(in, null);
-        }
-    }
-
-    @Override
-    public String getStreamName() {
-        return getStreamNameAt(nextFileIndex - 1);
-    }
-
-    @Override
-    public String getPreviousStreamName() {
-        return getStreamNameAt(nextFileIndex - 2);
-    }
-
-    private String getStreamNameAt(int fileIndex) {
-        return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index f3a36ff..08f8dec 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -19,27 +19,20 @@
 package org.apache.asterix.external.input.record.reader.aws;
 
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
 
-import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -54,35 +47,16 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
-public class AwsS3InputStreamFactory implements IInputStreamFactory {
+public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private Map<String, String> configuration;
-    private final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
-    private transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.STREAM;
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
     @Override
     public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
         return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return partitionConstraint;
-    }
-
-    @Override
     public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
             throws AlgebricksException {
         this.configuration = configuration;
@@ -93,44 +67,13 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory {
         List<S3Object> filesOnly = new ArrayList<>();
 
         // Ensure the validity of include/exclude
-        ExternalDataUtils.AwsS3.validateIncludeExclude(configuration);
-
-        // Get and compile the patterns for include/exclude if provided
-        List<Matcher> includeMatchers = new ArrayList<>();
-        List<Matcher> excludeMatchers = new ArrayList<>();
-        String pattern = null;
-        try {
-            for (Map.Entry<String, String> entry : configuration.entrySet()) {
-                if (entry.getKey().startsWith(KEY_INCLUDE)) {
-                    pattern = entry.getValue();
-                    includeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
-                } else if (entry.getKey().startsWith(KEY_EXCLUDE)) {
-                    pattern = entry.getValue();
-                    excludeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
-                }
-            }
-        } catch (PatternSyntaxException ex) {
-            throw new CompilationException(ErrorCode.INVALID_REGEX_PATTERN, pattern);
-        }
-
-        List<Matcher> matchersList;
-        BiPredicate<List<Matcher>, String> p;
-        if (!includeMatchers.isEmpty()) {
-            matchersList = includeMatchers;
-            p = (matchers, key) -> ExternalDataUtils.matchPatterns(matchers, key);
-        } else if (!excludeMatchers.isEmpty()) {
-            matchersList = excludeMatchers;
-            p = (matchers, key) -> !ExternalDataUtils.matchPatterns(matchers, key);
-        } else {
-            matchersList = Collections.emptyList();
-            p = (matchers, key) -> true;
-        }
+        ExternalDataUtils.validateIncludeExclude(configuration);
 
         S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
 
         // Get all objects in a bucket and extract the paths to files
         ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
-        ExternalDataUtils.AwsS3.setPrefix(configuration, listObjectsBuilder);
+        listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration));
 
         ListObjectsV2Response listObjectsResponse;
         boolean done = false;
@@ -147,7 +90,9 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory {
                 }
 
                 // Collect the paths to files only
-                collectAndFilterFiles(listObjectsResponse.contents(), p, matchersList, filesOnly);
+                IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers();
+                collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
+                        includeExcludeMatcher.getMatchersList(), filesOnly);
 
                 // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
                 if (!listObjectsResponse.isTruncated()) {
@@ -224,52 +169,4 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory {
             smallest.addFilePath(object.key(), object.size());
         }
     }
-
-    /**
-     * Finds the smallest workload and returns it
-     *
-     * @return the smallest workload
-     */
-    private PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
-        PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
-        for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
-            // If the current total size is 0, add the file directly as this is a first time partition
-            if (partition.getTotalSize() == 0) {
-                smallest = partition;
-                break;
-            }
-            if (partition.getTotalSize() < smallest.getTotalSize()) {
-                smallest = partition;
-            }
-        }
-
-        return smallest;
-    }
-
-    private static class PartitionWorkLoadBasedOnSize implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final List<String> filePaths = new ArrayList<>();
-        private long totalSize = 0;
-
-        PartitionWorkLoadBasedOnSize() {
-        }
-
-        public List<String> getFilePaths() {
-            return filePaths;
-        }
-
-        public void addFilePath(String filePath, long size) {
-            this.filePaths.add(filePath);
-            this.totalSize += size;
-        }
-
-        public long getTotalSize() {
-            return totalSize;
-        }
-
-        @Override
-        public String toString() {
-            return "Files: " + filePaths.size() + ", Total Size: " + totalSize;
-        }
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
new file mode 100644
index 0000000..358c412
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.LogRedactionUtil;
+
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobStorageException;
+
+public class AzureBlobInputStream extends AbstractExternalInputStream {
+
+    private final BlobServiceClient client;
+
+    public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
+        super(configuration, filePaths);
+        this.client = buildAzureClient(configuration);
+    }
+
+    @Override
+    protected boolean getInputStream() throws IOException {
+        String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+        BlobContainerClient blobContainerClient;
+        BlobClient blob;
+        try {
+            blobContainerClient = client.getBlobContainerClient(container);
+            blob = blobContainerClient.getBlobClient(filePaths.get(nextFileIndex));
+            in = blob.openInputStream();
+
+            // Use gzip stream if needed
+            String filename = filePaths.get(nextFileIndex).toLowerCase();
+            if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
+                in = new GZIPInputStream(in = blob.openInputStream(), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+            }
+        } catch (BlobStorageException ex) {
+            if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+                LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(filePaths.get(nextFileIndex)) + " was not "
+                        + "found in container " + container);
+                return false;
+            } else {
+                throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+            }
+        } catch (Exception ex) {
+            throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+        }
+
+        return true;
+    }
+
+    private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
+        try {
+            return ExternalDataUtils.Azure.buildAzureClient(configuration);
+        } catch (CompilationException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
new file mode 100644
index 0000000..9b41b08
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
+public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        return new AzureBlobInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+            throws AlgebricksException {
+        this.configuration = configuration;
+        ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+
+        String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+
+        List<BlobItem> filesOnly = new ArrayList<>();
+
+        // Ensure the validity of include/exclude
+        ExternalDataUtils.validateIncludeExclude(configuration);
+
+        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+        BlobContainerClient blobContainer;
+        try {
+            blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+            // Get all objects in a container and extract the paths to files
+            ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+            listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration));
+            Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+            // Collect the paths to files only
+            IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers();
+            collectAndFilterFiles(blobItems, includeExcludeMatcher.getPredicate(),
+                    includeExcludeMatcher.getMatchersList(), filesOnly);
+
+            // Warn if no files are returned
+            if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+                Warning warning =
+                        WarningUtil.forAsterix(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                warningCollector.warn(warning);
+            }
+
+            // Partition constraints
+            partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
+            int partitionsCount = partitionConstraint.getLocations().length;
+
+            // Distribute work load amongst the partitions
+            distributeWorkLoad(filesOnly, partitionsCount);
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+        }
+    }
+
+    /**
+     * Collects and filters the files only, and excludes any folders
+     *
+     * @param items storage items
+     * @param predicate predicate to test with for file filtration
+     * @param matchers include/exclude matchers to test against
+     * @param filesOnly List containing the files only (excluding folders)
+     */
+    private void collectAndFilterFiles(Iterable<BlobItem> items, BiPredicate<List<Matcher>, String> predicate,
+            List<Matcher> matchers, List<BlobItem> filesOnly) {
+        for (BlobItem item : items) {
+            String uri = item.getName();
+
+            // skip folders
+            if (uri.endsWith("/")) {
+                continue;
+            }
+
+            // No filter, add file
+            if (predicate.test(matchers, uri)) {
+                filesOnly.add(item);
+            }
+        }
+    }
+
+    /**
+     * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
+     * size.
+     *
+     * Example:
+     * File1 1mb, File2 300kb, File3 300kb, File4 300kb
+     *
+     * Distribution:
+     * Partition1: [File1]
+     * Partition2: [File2, File3, File4]
+     *
+     * @param items items
+     * @param partitionsCount Partitions count
+     */
+    private void distributeWorkLoad(List<BlobItem> items, int partitionsCount) {
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+        }
+
+        for (BlobItem item : items) {
+            PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
+            smallest.addFilePath(item.getName(), item.getProperties().getContentLength());
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
new file mode 100644
index 0000000..27e1b02
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class AzureBlobReaderFactory extends StreamRecordReaderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final List<String> recordReaderNames =
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB);
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+        return streamFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+            throws AlgebricksException, HyracksDataException {
+        this.configuration = configuration;
+
+        // Stream factory
+        streamFactory = new AzureBlobInputStreamFactory();
+        streamFactory.configure(ctx, configuration, warningCollector);
+
+        // record reader
+        recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 252ed5b..e20270c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -131,6 +131,7 @@ public class ExternalDataConstants {
     public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter";
     public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
     public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3";
+    public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
 
     /**
      * HDFS class names
@@ -290,4 +291,25 @@ public class ExternalDataConstants {
         public static final String DEFINITION_FIELD_NAME = "definition";
         public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
     }
+
+    public static class AzureBlob {
+        private AzureBlob() {
+            throw new AssertionError("do not instantiate");
+        }
+
+        public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
+        public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
+        public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature";
+        public static final String CONTAINER_NAME_FIELD_NAME = "container";
+        public static final String DEFINITION_FIELD_NAME = "definition";
+        public static final String BLOB_ENDPOINT_FIELD_NAME = "blobEndpoint";
+        public static final String ENDPOINT_SUFFIX_FIELD_NAME = "endpointSuffix";
+
+        // Connection string requires PascalCase (MyFieldFormat)
+        public static final String CONNECTION_STRING_ACCOUNT_NAME = "AccountName";
+        public static final String CONNECTION_STRING_ACCOUNT_KEY = "AccountKey";
+        public static final String CONNECTION_STRING_SHARED_ACCESS_SIGNATURE = "SharedAccessSignature";
+        public static final String CONNECTION_STRING_BLOB_ENDPOINT = "BlobEndpoint";
+        public static final String CONNECTION_STRING_ENDPOINT_SUFFIX = "EndpointSuffix";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 3462b76..86eb5e0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.util;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ACCOUNT_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_BLOB_ENDPOINT;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ENDPOINT_SUFFIX;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
@@ -67,6 +71,12 @@ import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkException;
@@ -500,6 +510,9 @@ public class ExternalDataUtils {
             case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
                 ExternalDataUtils.AwsS3.validateProperties(configuration, srcLoc, collector);
                 break;
+            case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
+                ExternalDataUtils.Azure.validateProperties(configuration, srcLoc, collector);
+                break;
             default:
                 // Nothing needs to be done
                 break;
@@ -612,6 +625,63 @@ public class ExternalDataUtils {
         return result.toString();
     }
 
+    /**
+     * Adjusts the prefix (if needed) and returns it
+     *
+     * @param configuration configuration
+     */
+    public static String getPrefix(Map<String, String> configuration) {
+        String definition = configuration.get(ExternalDataConstants.AzureBlob.DEFINITION_FIELD_NAME);
+        if (definition != null && !definition.isEmpty()) {
+            return definition + (!definition.endsWith("/") ? "/" : "");
+        }
+        return "";
+    }
+
+    /**
+     * @param configuration configuration map
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
+        // Ensure that include and exclude are not provided at the same time + ensure valid format or property
+        List<Map.Entry<String, String>> includes = new ArrayList<>();
+        List<Map.Entry<String, String>> excludes = new ArrayList<>();
+
+        // Accepted formats are include, include#1, include#2, ... etc, same for excludes
+        for (Map.Entry<String, String> entry : configuration.entrySet()) {
+            String key = entry.getKey();
+
+            if (key.equals(ExternalDataConstants.KEY_INCLUDE)) {
+                includes.add(entry);
+            } else if (key.equals(ExternalDataConstants.KEY_EXCLUDE)) {
+                excludes.add(entry);
+            } else if (key.startsWith(ExternalDataConstants.KEY_INCLUDE)
+                    || key.startsWith(ExternalDataConstants.KEY_EXCLUDE)) {
+
+                // Split by the "#", length should be 2, left should be include/exclude, right should be integer
+                String[] splits = key.split("#");
+
+                if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) && splits.length == 2
+                        && splits[0].equals(ExternalDataConstants.KEY_INCLUDE)
+                        && NumberUtils.isIntegerNumericString(splits[1])) {
+                    includes.add(entry);
+                } else if (key.startsWith(ExternalDataConstants.KEY_EXCLUDE) && splits.length == 2
+                        && splits[0].equals(ExternalDataConstants.KEY_EXCLUDE)
+                        && NumberUtils.isIntegerNumericString(splits[1])) {
+                    excludes.add(entry);
+                } else {
+                    throw new CompilationException(ErrorCode.INVALID_PROPERTY_FORMAT, key);
+                }
+            }
+        }
+
+        // Ensure either include or exclude are provided, but not both of them
+        if (!includes.isEmpty() && !excludes.isEmpty()) {
+            throw new CompilationException(ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME,
+                    ExternalDataConstants.KEY_INCLUDE, ExternalDataConstants.KEY_EXCLUDE);
+        }
+    }
+
     public static class AwsS3 {
         private AwsS3() {
             throw new AssertionError("do not instantiate");
@@ -667,19 +737,6 @@ public class ExternalDataUtils {
         }
 
         /**
-         * Sets the prefix for the list objects builder if it is available
-         *
-         * @param configuration configuration
-         * @param builder builder
-         */
-        public static void setPrefix(Map<String, String> configuration, ListObjectsV2Request.Builder builder) {
-            String definition = configuration.get(ExternalDataConstants.AwsS3.DEFINITION_FIELD_NAME);
-            if (definition != null) {
-                builder.prefix(definition + (!definition.isEmpty() && !definition.endsWith("/") ? "/" : ""));
-            }
-        }
-
-        /**
          * Validate external dataset properties
          *
          * @param configuration properties
@@ -702,7 +759,7 @@ public class ExternalDataUtils {
                 String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
                 s3Client = buildAwsS3Client(configuration);
                 ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
-                setPrefix(configuration, listObjectsBuilder);
+                listObjectsBuilder.prefix(getPrefix(configuration));
 
                 ListObjectsV2Response response =
                         s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
@@ -726,49 +783,82 @@ public class ExternalDataUtils {
                 }
             }
         }
+    }
+
+    public static class Azure {
+        private Azure() {
+            throw new AssertionError("do not instantiate");
+        }
 
         /**
-         * @param configuration
-         * @throws CompilationException
+         * Builds the Azure storage account using the provided configuration
+         *
+         * @param configuration properties
+         * @return client
          */
-        public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
-            // Ensure that include and exclude are not provided at the same time + ensure valid format or property
-            List<Map.Entry<String, String>> includes = new ArrayList<>();
-            List<Map.Entry<String, String>> excludes = new ArrayList<>();
+        public static BlobServiceClient buildAzureClient(Map<String, String> configuration)
+                throws CompilationException {
+            // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
+            String accountName = configuration.get(ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME);
+            String accountKey = configuration.get(ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME);
+            String blobEndpoint = configuration.get(ExternalDataConstants.AzureBlob.BLOB_ENDPOINT_FIELD_NAME);
+            String endpointSuffix = configuration.get(ExternalDataConstants.AzureBlob.ENDPOINT_SUFFIX_FIELD_NAME);
+
+            // format: name1=value1;name2=value2;....
+            // TODO(Hussain): This will be different when SAS (Shared Access Signature) is introduced
+            StringBuilder connectionString = new StringBuilder();
+            connectionString.append(CONNECTION_STRING_ACCOUNT_NAME).append("=").append(accountName).append(";");
+            connectionString.append(CONNECTION_STRING_ACCOUNT_KEY).append("=").append(accountKey).append(";");
+            connectionString.append(CONNECTION_STRING_BLOB_ENDPOINT).append("=").append(blobEndpoint).append(";");
+
+            if (endpointSuffix != null) {
+                connectionString.append(CONNECTION_STRING_ENDPOINT_SUFFIX).append("=").append(endpointSuffix)
+                        .append(";");
+            }
 
-            // Accepted formats are include, include#1, include#2, ... etc, same for excludes
-            for (Map.Entry<String, String> entry : configuration.entrySet()) {
-                String key = entry.getKey();
+            try {
+                return new BlobServiceClientBuilder().connectionString(connectionString.toString()).buildClient();
+            } catch (Exception ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+            }
+        }
 
-                if (key.equals(ExternalDataConstants.KEY_INCLUDE)) {
-                    includes.add(entry);
-                } else if (key.equals(ExternalDataConstants.KEY_EXCLUDE)) {
-                    excludes.add(entry);
-                } else if (key.startsWith(ExternalDataConstants.KEY_INCLUDE)
-                        || key.startsWith(ExternalDataConstants.KEY_EXCLUDE)) {
-
-                    // Split by the "#", length should be 2, left should be include/exclude, right should be integer
-                    String[] splits = key.split("#");
-
-                    if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) && splits.length == 2
-                            && splits[0].equals(ExternalDataConstants.KEY_INCLUDE)
-                            && NumberUtils.isIntegerNumericString(splits[1])) {
-                        includes.add(entry);
-                    } else if (key.startsWith(ExternalDataConstants.KEY_EXCLUDE) && splits.length == 2
-                            && splits[0].equals(ExternalDataConstants.KEY_EXCLUDE)
-                            && NumberUtils.isIntegerNumericString(splits[1])) {
-                        excludes.add(entry);
-                    } else {
-                        throw new CompilationException(ErrorCode.INVALID_PROPERTY_FORMAT, key);
-                    }
-                }
+        /**
+         * Validate external dataset properties
+         *
+         * @param configuration properties
+         *
+         * @throws CompilationException Compilation exception
+         */
+        public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+                IWarningCollector collector) throws CompilationException {
+
+            // check if the format property is present
+            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+                throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
             }
 
-            // TODO: Should include/exclude be a common check or S3 specific?
-            // Ensure either include or exclude are provided, but not both of them
-            if (!includes.isEmpty() && !excludes.isEmpty()) {
-                throw new CompilationException(ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME,
-                        ExternalDataConstants.KEY_INCLUDE, ExternalDataConstants.KEY_EXCLUDE);
+            validateIncludeExclude(configuration);
+
+            // Check if the bucket is present
+            BlobServiceClient blobServiceClient;
+            try {
+                String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+                blobServiceClient = buildAzureClient(configuration);
+                BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+                // Get all objects in a container and extract the paths to files
+                ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+                listBlobsOptions.setPrefix(getPrefix(configuration));
+                Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+                if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+                    Warning warning =
+                            WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                    collector.warn(warning);
+                }
+            } catch (Exception ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
             }
         }
     }
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index fd3e473..d66c44e 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -21,3 +21,4 @@ org.apache.asterix.external.input.HDFSDataSourceFactory
 org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
 org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
 org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
+org.apache.asterix.external.input.record.reader.azure.AzureBlobReaderFactory
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 2c12ab0..8c07792 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -202,6 +202,16 @@
               <noticeUrl>https://raw.githubusercontent.com/awslabs/aws-eventstream-java/7be2dd80e12f8835674c8ffb0f4a2efb64c7b585/NOTICE</noticeUrl>
             </override>
             <override>
+              <gavs>
+                <gav>com.azure:azure-core:1.4.0</gav>
+                <gav>com.azure:azure-core-http-netty:1.5.0</gav>
+                <gav>com.azure:azure-storage-blob:12.6.0</gav>
+                <gav>com.azure:azure-storage-common:12.6.0</gav>
+              </gavs>
+              <noticeUrl>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/NOTICE.txt</noticeUrl>
+              <url>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/LICENSE.txt</url>
+            </override>
+            <override>
               <gav>org.mindrot:jbcrypt:0.4</gav>
               <url>http://www.mindrot.org/files/jBCrypt/LICENSE</url>
             </override>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 712dcc1..d69d62b 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -85,6 +85,7 @@
     <jacoco.version>0.7.6.201602180812</jacoco.version>
     <log4j.version>2.13.3</log4j.version>
     <awsjavasdk.version>2.10.83</awsjavasdk.version>
+    <azurejavasdk.version>12.6.0</azurejavasdk.version>
 
     <implementation.title>Apache AsterixDB - ${project.name}</implementation.title>
     <implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -1492,6 +1493,55 @@
         <artifactId>akka-http-core_2.12</artifactId>
         <version>10.1.0</version>
       </dependency>
+      <!-- Azure Blob Storage start -->
+      <dependency>
+        <groupId>com.azure</groupId>
+        <artifactId>azure-storage-blob</artifactId>
+        <version>${azurejavasdk.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler-proxy</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http2</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-unix-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <!-- Azure Blob Storage end -->
       <dependency>
         <groupId>org.mindrot</groupId>
         <artifactId>jbcrypt</artifactId>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index e457628..500a035 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -529,4 +529,118 @@
       </properties>
     </project>
   </supplement>
+
+  <!-- Azure SDK for Java start -->
+  <!-- com.azure does not contain any embedded LICENSE or NOTICE file -->
+  <!-- see https://github.com/Azure/azure-sdk-for-java -->
+  <supplement>
+    <project>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-blob</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>12.6.0</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>12.6.0</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreLicenseOverride>12.6.0</license.ignoreLicenseOverride>
+        <license.ignoreNoticeOverride>12.6.0</license.ignoreNoticeOverride>
+      </properties>
+    </project>
+  </supplement>
+
+  <supplement>
+    <project>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-common</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>12.6.0</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>12.6.0</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreLicenseOverride>12.6.0</license.ignoreLicenseOverride>
+        <license.ignoreNoticeOverride>12.6.0</license.ignoreNoticeOverride>
+      </properties>
+    </project>
+  </supplement>
+
+  <supplement>
+    <project>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-core</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>1.4.0</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>1.4.0</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreLicenseOverride>1.4.0</license.ignoreLicenseOverride>
+        <license.ignoreNoticeOverride>1.4.0</license.ignoreNoticeOverride>
+      </properties>
+    </project>
+  </supplement>
+
+  <supplement>
+    <project>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-core-http-netty</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>1.5.0</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>1.5.0</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreLicenseOverride>1.5.0</license.ignoreLicenseOverride>
+        <license.ignoreNoticeOverride>1.5.0</license.ignoreNoticeOverride>
+      </properties>
+    </project>
+  </supplement>
+  <!-- Azure SDK for Java end -->
+
+  <!-- jackson-datatype-jsr contains embedded license but has no NOTICE -->
+  <!-- See https://github.com/FasterXML/jackson-modules-java8 -->
+  <supplement>
+    <project>
+      <groupId>com.fasterxml.jackson.datatype</groupId>
+      <artifactId>jackson-datatype-jsr310</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedNotice>2.10.1</license.ignoreMissingEmbeddedNotice>
+      </properties>
+    </project>
+  </supplement>
+
+  <!-- com.fasterxml.woodstox contains embedded license but has no NOTICE -->
+  <!-- See https://github.com/FasterXML/woodstox -->
+  <supplement>
+    <project>
+      <groupId>com.fasterxml.woodstox</groupId>
+      <artifactId>woodstox-core</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedNotice>6.0.2</license.ignoreMissingEmbeddedNotice>
+      </properties>
+    </project>
+  </supplement>
+
+  <!-- org.codehaus.woodstox contains embedded license but has no NOTICE -->
+  <!-- See https://github.com/FasterXML/stax2-api -->
+  <supplement>
+    <project>
+      <groupId>org.codehaus.woodstox</groupId>
+      <artifactId>stax2-api</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedNotice>4.2</license.ignoreMissingEmbeddedNotice>
+      </properties>
+    </project>
+  </supplement>
+
+  <supplement>
+    <project>
+      <groupId>io.projectreactor</groupId>
+      <artifactId>reactor-core</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>3.3.3.RELEASE</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>3.3.3.RELEASE</license.ignoreMissingEmbeddedNotice>
+      </properties>
+    </project>
+  </supplement>
+
+  <supplement>
+    <project>
+      <groupId>io.projectreactor.netty</groupId>
+      <artifactId>reactor-netty</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>0.9.5.RELEASE</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>0.9.5.RELEASE</license.ignoreMissingEmbeddedNotice>
+      </properties>
+    </project>
+  </supplement>
 </supplementalDataModels>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_LICENSE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_LICENSE.txt
new file mode 100644
index 0000000..49d2166
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_LICENSE.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Microsoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_NOTICE.txt
new file mode 100644
index 0000000..76791aa
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_NOTICE.txt
@@ -0,0 +1,159 @@
+NOTICES AND INFORMATION
+Do Not Translate or Localize
+
+This software incorporates material from third parties. Microsoft makes certain
+open source code available at https://3rdpartysource.microsoft.com, or you may
+send a check or money order for US $5.00, including the product name, the open
+source component name, and version number, to:
+
+Source Code Compliance Team
+Microsoft Corporation
+One Microsoft Way
+Redmond, WA 98052
+USA
+
+Notwithstanding any other terms, you may reverse engineer this software to the
+extent required to debug changes to any libraries licensed under the GNU Lesser
+General Public License.
+
+------------------------------------------------------------------------------
+
+Azure SDK for Java uses third-party libraries or other resources that may be
+distributed under licenses different than the Azure SDK for Java software.
+
+In the event that we accidentally failed to list a required notice, please
+bring it to our attention. Post an issue or email us:
+
+           azjavasdkhelp@microsoft.com
+
+The attached notices are provided for information only.
+
+License notice for Hamcrest
+------------------------------------------------------------------------------
+
+The 3-Clause BSD License
+
+Copyright (c) 2000-2015 www.hamcrest.org
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer. Redistributions in binary form must reproduce
+the above copyright notice, this list of conditions and the following disclaimer in
+the documentation and/or other materials provided with the distribution.
+
+Neither the name of Hamcrest nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+License notice for Slf4j API
+------------------------------------------------------------------------------
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free  of charge, to any person obtaining
+ a  copy  of this  software  and  associated  documentation files  (the
+ "Software"), to  deal in  the Software without  restriction, including
+ without limitation  the rights to  use, copy, modify,  merge, publish,
+ distribute,  sublicense, and/or sell  copies of  the Software,  and to
+ permit persons to whom the Software  is furnished to do so, subject to
+ the following conditions:
+
+ The  above  copyright  notice  and  this permission  notice  shall  be
+ included in all copies or substantial portions of the Software.
+
+ THE  SOFTWARE IS  PROVIDED  "AS  IS", WITHOUT  WARRANTY  OF ANY  KIND,
+ EXPRESS OR  IMPLIED, INCLUDING  BUT NOT LIMITED  TO THE  WARRANTIES OF
+ MERCHANTABILITY,    FITNESS    FOR    A   PARTICULAR    PURPOSE    AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+License notice for Slf4j Simple
+------------------------------------------------------------------------------
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free  of charge, to any person obtaining
+ a  copy  of this  software  and  associated  documentation files  (the
+ "Software"), to  deal in  the Software without  restriction, including
+ without limitation  the rights to  use, copy, modify,  merge, publish,
+ distribute,  sublicense, and/or sell  copies of  the Software,  and to
+ permit persons to whom the Software  is furnished to do so, subject to
+ the following conditions:
+
+ The  above  copyright  notice  and  this permission  notice  shall  be
+ included in all copies or substantial portions of the Software.
+
+ THE  SOFTWARE IS  PROVIDED  "AS  IS", WITHOUT  WARRANTY  OF ANY  KIND,
+ EXPRESS OR  IMPLIED, INCLUDING  BUT NOT LIMITED  TO THE  WARRANTIES OF
+ MERCHANTABILITY,    FITNESS    FOR    A   PARTICULAR    PURPOSE    AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+License notice for Guava (https://github.com/google/guava)
+------------------------------------------------------------------------------
+
+Copyright (C) 2010 The Guava Authors
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+in compliance with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed under the License
+is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+or implied. See the License for the specific language governing permissions and limitations under
+the License.
+
+License notice for Netty
+------------------------------------------------------------------------------
+
+Copyright 2014 The Netty Project
+
+The Netty Project licenses this file to you under the Apache License,
+version 2.0 (the "License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at:
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+
+License notice for JUG Java Uuid Generator
+------------------------------------------------------------------------------
+
+JUG Java Uuid Generator
+
+Copyright (c) 2002- Tatu Saloranta, tatu.saloranta@iki.fi
+
+Licensed under the License specified in the file LICENSE which is
+included with the source code.
+You may not use this file except in compliance with the License.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file