You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2020/05/08 13:16:58 UTC

[asterixdb] 05/06: [ASTERIXDB-2697]: Implementing AWS s3 as external data source

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit bc0f7e04ce2dfe8fc7c89e0f80d2d98bf04c83dc
Author: Hussain Towaileb <Hu...@Gmail.com>
AuthorDate: Mon Mar 30 16:10:30 2020 +0300

    [ASTERIXDB-2697]: Implementing AWS s3 as external data source
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Added an external reader for AWS S3.
    - Updated query translator to include the WITH
      parameters into the dataset details when creating
      an external dataset.
    - Added test case for AWS S3 using an S3 mocking server
      to avoid using real credentials.
    
    Change-Id: I71d89116c0bb404c9621b16f21a6a31cbf7bb7f6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5025
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
---
 asterixdb/asterix-app/pom.xml                      |  33 +++
 .../asterix/app/external/ExternalLibraryUtils.java |   2 +-
 .../asterix/app/translator/QueryTranslator.java    |  22 ++
 .../asterix/hyracks/bootstrap/CCApplication.java   |  10 +-
 .../org/apache/asterix/utils/FeedOperations.java   |  12 +-
 .../aws/AwsS3ExternalDatasetTest.java              | 215 ++++++++++++++++++
 .../resources/runtimets/only_external_dataset.xml  |  23 ++
 .../aws/s3/000/external_dataset.000.ddl.sqlpp}     |  23 +-
 .../aws/s3/000/external_dataset.001.query.sqlpp}   |  10 +-
 .../aws/s3/000/external_dataset.002.ddl.sqlpp}     |   9 +-
 .../aws/s3/000/external_dataset.001.adm            |  14 ++
 .../results/feeds/feeds_01/feeds_01.1.adm          |   2 +-
 .../runtimets/testsuite_external_dataset.xml       |  28 +++
 .../common/dataflow/ICcApplicationContext.java     |  16 ++
 .../asterix/common/external}/IAdapterFactory.java  |  31 +--
 .../common/external/IAdapterFactoryService.java}   |  15 +-
 .../common/external}/IDataSourceAdapter.java       |   2 +-
 asterixdb/asterix-external-data/pom.xml            |  12 +
 .../factory/AdapterFactoryService.java}            |  18 +-
 .../adapter/factory/GenericAdapterFactory.java     |   6 +-
 .../external/api/IIndexingAdapterFactory.java      |   2 +-
 .../asterix/external/api/ITypedAdapterFactory.java |  54 +++++
 .../external/dataset/adapter/FeedAdapter.java      |   2 +-
 .../external/dataset/adapter/GenericAdapter.java   |   2 +-
 .../input/record/reader/aws/AwsS3InputStream.java  | 163 ++++++++++++++
 .../record/reader/aws/AwsS3InputStreamFactory.java | 250 +++++++++++++++++++++
 .../record/reader/aws/AwsS3ReaderFactory.java      |  90 ++++++++
 .../operators/ExternalScanOperatorDescriptor.java  |   8 +-
 .../operators/FeedIntakeOperatorDescriptor.java    |  16 +-
 .../operators/FeedIntakeOperatorNodePushable.java  |   4 +-
 .../external/provider/AdapterFactoryProvider.java  |   9 +-
 .../provider/DatasourceFactoryProvider.java        |   1 +
 .../util/ExternalDataCompatibilityUtils.java       |   7 +-
 .../external/util/ExternalDataConstants.java       |  23 ++
 ...pache.asterix.external.api.IRecordReaderFactory |   1 +
 .../library/adapter/TestTypedAdapterFactory.java   |   6 +-
 .../asterix/lang/common/statement/DatasetDecl.java |   4 +-
 .../common/util/DatasetDeclParametersUtil.java     |  18 +-
 .../metadata/bootstrap/MetadataBootstrap.java      |   7 +-
 .../metadata/declared/DatasetDataSource.java       |   4 +-
 .../metadata/declared/LoadableDataSource.java      |   4 +-
 .../metadata/declared/MetadataProvider.java        |  18 +-
 .../metadata/entities/DatasourceAdapter.java       |   2 +-
 .../DatasourceAdapterTupleTranslator.java          |   2 +-
 .../asterix/metadata/feeds/FeedMetadataUtil.java   |  22 +-
 .../metadata/utils/ExternalIndexingOperations.java |   4 +-
 .../runtime/utils/CcApplicationContext.java        |  12 +-
 asterixdb/asterix-server/pom.xml                   |   6 +
 asterixdb/pom.xml                                  |  79 +++++++
 ...streams_reactive-streams-jvm_v1.0.2_COPYING.txt | 121 ++++++++++
 ...streams_reactive-streams-jvm_v1.0.2_LICENSE.txt |   8 +
 51 files changed, 1313 insertions(+), 139 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index ac2c303..219595b 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -699,5 +699,38 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <!-- AWS -->
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>sdk-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>s3</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>regions</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>auth</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- Mock for AWS S3 -->
+    <dependency>
+      <groupId>io.findify</groupId>
+      <artifactId>s3mock_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- Needed for the s3 mock -->
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-http-core_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index a989941..c185340 100755
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -34,9 +34,9 @@ import javax.xml.bind.Unmarshaller;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.external.library.ExternalLibrary;
 import org.apache.asterix.external.library.LibraryAdapter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ce1a354..b8a048d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -155,6 +155,8 @@ import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataLockUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.object.base.AdmStringNode;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -646,9 +648,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                             keySourceIndicators, partitioningTypes, autogenerated, filterField);
                     break;
                 case EXTERNAL:
+                    validateExternalDatasetRequirements(appCtx, metadataProvider, mdTxnCtx, dd);
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
                     Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
 
+                    // Add the withObjectNode items to the external dataset properties
+                    if (!dd.getWithObjectNode().isEmpty()) {
+                        AdmObjectNode withObjectNode = dd.getWithObjectNode();
+                        dd.getWithObjectNode().getFieldNames().iterator().forEachRemaining(fieldName -> properties
+                                .put(fieldName, ((AdmStringNode) withObjectNode.get(fieldName)).get()));
+                    }
                     datasetDetails =
                             new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT);
                     break;
@@ -3051,4 +3060,17 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
         }
     }
+
+    /**
+     * Performs any required validation before creating an external dataset
+     *
+     * @param appContext {@link ICcApplicationContext} context
+     * @param metadataProvider {@link MetadataProvider} metadata provider
+     * @param mdTxnCtx {@link MetadataTransactionContext} metadata transaction context
+     * @param datasetDecl {@link DatasetDecl} dataset declaration statement
+     */
+    protected void validateExternalDatasetRequirements(ICcApplicationContext appContext,
+            MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx, DatasetDecl datasetDecl)
+            throws Exception {
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 26c092f..fc912b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -69,9 +69,11 @@ import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IAdapterFactoryService;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.messaging.CCMessageBroker;
@@ -154,7 +156,7 @@ public class CCApplication extends BaseCCApplication {
         ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
         IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
         appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator,
-                () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager);
+                () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager, new AdapterFactoryService());
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
             System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress());
@@ -182,10 +184,12 @@ public class CCApplication extends BaseCCApplication {
     protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
             IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator,
             IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
-            CCExtensionManager ccExtensionManager) throws AlgebricksException, IOException {
+            CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
+            throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
                 globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
-                new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager);
+                new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager,
+                adapterFactoryService);
     }
 
     protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index b74f4c6..fc64d99 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -42,7 +42,7 @@ import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivityDetails;
@@ -135,14 +135,14 @@ public class FeedOperations {
     private FeedOperations() {
     }
 
-    private static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
+    private static Pair<JobSpecification, ITypedAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
             MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         spec.setFrameSize(metadataProvider.getApplicationContext().getCompilerProperties().getFrameSize());
-        IAdapterFactory adapterFactory;
+        ITypedAdapterFactory adapterFactory;
         IOperatorDescriptor feedIngestor;
         AlgebricksPartitionConstraint ingesterPc;
-        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t =
+        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> t =
                 metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor);
         feedIngestor = t.first;
         ingesterPc = t.second;
@@ -447,13 +447,13 @@ public class FeedOperations {
             MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections,
             IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception {
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
-        Pair<JobSpecification, IAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
+        Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
         List<JobSpecification> jobsList = new ArrayList<>();
         // TODO: Figure out a better way to handle insert/upsert per conn instead of per feed
         Boolean insertFeed = ExternalDataUtils.isInsertFeed(feed.getConfiguration());
         // Construct the ingestion Job
         JobSpecification intakeJob = intakeInfo.getLeft();
-        IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
+        ITypedAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
         String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
         // Add metadata configs
         metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, Boolean.TRUE.toString());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
new file mode 100644
index 0000000..3b4cdf8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.aws;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import io.findify.s3mock.S3Mock;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+/**
+ * Runs an AWS S3 mock server and test it as an external dataset
+ */
+@RunWith(Parameterized.class)
+public class AwsS3ExternalDatasetTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+
+    // S3 mock server
+    private static S3Mock s3MockServer;
+
+    // IMPORTANT: The following values must be used in the AWS S3 test case
+    private static S3Client client;
+    private static final String S3_MOCK_SERVER_BUCKET = "playground";
+    private static final String S3_MOCK_SERVER_BUCKET_DEFINITION = "json-data/reviews/"; // data resides here
+    private static final String S3_MOCK_SERVER_REGION = "us-west-2";
+    private static final int S3_MOCK_SERVER_PORT = 8001;
+    private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        final TestExecutor testExecutor = new TestExecutor();
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        setNcEndpoints(testExecutor);
+        startAwsS3MockServer();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+
+        // Shutting down S3 mock server
+        LOGGER.info("Shutting down S3 mock server and client");
+        if (client != null) {
+            client.close();
+        }
+        if (s3MockServer != null) {
+            s3MockServer.shutdown();
+        }
+        LOGGER.info("S3 mock down and client shut down successfully");
+    }
+
+    @Parameters(name = "SqlppExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("only_external_dataset.xml", "testsuite_external_dataset.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public AwsS3ExternalDatasetTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static void setNcEndpoints(TestExecutor testExecutor) {
+        final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+        final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+        final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+        for (NodeControllerService nc : ncs) {
+            final String nodeId = nc.getId();
+            final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+            int apiPort = appCtx.getExternalProperties().getNcApiPort();
+            ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+        }
+        testExecutor.setNcEndPoints(ncEndPoints);
+    }
+
+    /**
+     * Starts the AWS s3 mocking server and loads some files for testing
+     */
+    private static void startAwsS3MockServer() {
+        // Starting S3 mock server to be used instead of real S3 server
+        LOGGER.info("Starting S3 mock server");
+        s3MockServer = new S3Mock.Builder().withPort(S3_MOCK_SERVER_PORT).withInMemoryBackend().build();
+        s3MockServer.start();
+        LOGGER.info("S3 mock server started successfully");
+
+        // Create a client and add some files to the S3 mock server
+        LOGGER.info("Creating S3 client to load initial files to S3 mock server");
+        S3ClientBuilder builder = S3Client.builder();
+        URI endpoint = URI.create(S3_MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server
+        builder.region(Region.of(S3_MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+                .endpointOverride(endpoint);
+        client = builder.build();
+        LOGGER.info("Client created successfully");
+
+        // Create the bucket and upload some json files
+        prepareS3Bucket();
+    }
+
+    /**
+     * Creates a bucket and fills it with some files for testing purpose.
+     */
+    private static void prepareS3Bucket() {
+        LOGGER.info("creating bucket " + S3_MOCK_SERVER_BUCKET);
+        client.createBucket(CreateBucketRequest.builder().bucket(S3_MOCK_SERVER_BUCKET).build());
+        LOGGER.info("bucket created successfully");
+
+        LOGGER.info("Adding JSON files to the bucket");
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "0.json").build(),
+                RequestBody.fromString("{\"id\": 1, \"year\": null, \"quarter\": null, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "1.json").build(),
+                RequestBody.fromString("{\"id\": 2, \"year\": null, \"quarter\": null, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/1.json").build(),
+                RequestBody.fromString("{\"id\": 3, \"year\": 2018, \"quarter\": null, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/2.json").build(),
+                RequestBody.fromString("{\"id\": 4, \"year\": 2018, \"quarter\": null, \"review\": \"bad\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q1/1.json").build(),
+                RequestBody.fromString("{\"id\": 5, \"year\": 2018, \"quarter\": 1, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q1/2.json").build(),
+                RequestBody.fromString("{\"id\": 6, \"year\": 2018, \"quarter\": 1, \"review\": \"bad\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q2/1.json").build(),
+                RequestBody.fromString("{\"id\": 7, \"year\": 2018, \"quarter\": 2, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q2/2.json").build(),
+                RequestBody.fromString("{\"id\": 8, \"year\": 2018, \"quarter\": 2, \"review\": \"bad\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/1.json").build(),
+                RequestBody.fromString("{\"id\": 9, \"year\": 2019, \"quarter\": null, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/2.json").build(),
+                RequestBody.fromString("{\"id\": 10, \"year\": 2019, \"quarter\": null, \"review\": \"bad\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q1/1.json").build(),
+                RequestBody.fromString("{\"id\": 11, \"year\": 2019, \"quarter\": 1, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q1/2.json").build(),
+                RequestBody.fromString("{\"id\": 12, \"year\": 2019, \"quarter\": 1, \"review\": \"bad\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/1.json").build(),
+                RequestBody.fromString("{\"id\": 13, \"year\": 2019, \"quarter\": 2, \"review\": \"good\"}"));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/2.json").build(),
+                RequestBody.fromString("{\"id\": 14, \"year\": 2019, \"quarter\": 2, \"review\": \"bad\"}"));
+        LOGGER.info("Files added successfully");
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml
new file mode 100644
index 0000000..334dd52
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+  <test-group name="failed">
+  </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp
similarity index 66%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp
index 37cc1cf..9c6a994 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp
@@ -16,12 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.util.List;
+drop dataverse test if exists;
+create dataverse test;
+use test;
 
-import org.apache.asterix.external.indexing.ExternalFile;
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="json-data/reviews"),
+("format"="json")
+);
 
-public interface IIndexingAdapterFactory extends IAdapterFactory {
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp
similarity index 75%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp
index 37cc1cf..2dd9cc5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp
@@ -16,12 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.util.List;
+use test;
 
-import org.apache.asterix.external.indexing.ExternalFile;
+from test
+select value test
+order by id asc;
 
-public interface IIndexingAdapterFactory extends IAdapterFactory {
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp
similarity index 75%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp
index 37cc1cf..548e632 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp
@@ -16,12 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-
-public interface IIndexingAdapterFactory extends IAdapterFactory {
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
-}
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm
new file mode 100644
index 0000000..a7ce908
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm
@@ -0,0 +1,14 @@
+{ "id": 1, "year": null, "quarter": null, "review": "good" }
+{ "id": 2, "year": null, "quarter": null, "review": "good" }
+{ "id": 3, "year": 2018, "quarter": null, "review": "good" }
+{ "id": 4, "year": 2018, "quarter": null, "review": "bad" }
+{ "id": 5, "year": 2018, "quarter": 1, "review": "good" }
+{ "id": 6, "year": 2018, "quarter": 1, "review": "bad" }
+{ "id": 7, "year": 2018, "quarter": 2, "review": "good" }
+{ "id": 8, "year": 2018, "quarter": 2, "review": "bad" }
+{ "id": 9, "year": 2019, "quarter": null, "review": "good" }
+{ "id": 10, "year": 2019, "quarter": null, "review": "bad" }
+{ "id": 11, "year": 2019, "quarter": 1, "review": "good" }
+{ "id": 12, "year": 2019, "quarter": 1, "review": "bad" }
+{ "id": 13, "year": 2019, "quarter": 2, "review": "good" }
+{ "id": 14, "year": 2019, "quarter": 2, "review": "bad" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 85cd967..1dc31dc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "type-name", "Value": "TweetType"  [...]
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "linkName", "Value": "localfs" },  [...]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
new file mode 100644
index 0000000..cd1fb12
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+  <test-group name="external-dataset">
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="aws/s3/000">
+        <output-dir compare="Text">aws/s3/000</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3389962..5fc1bb7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.dataflow;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.api.IRequestTracker;
@@ -26,6 +27,7 @@ import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.external.IAdapterFactoryService;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.storage.ICompressionManager;
@@ -127,4 +129,18 @@ public interface ICcApplicationContext extends IApplicationContext {
      * @return the request tracker.
      */
     IRequestTracker getRequestTracker();
+
+    /**
+     * Gets the coordination service
+     *
+     * @return the coordination service
+     */
+    ICoordinationService getCoordinationService();
+
+    /**
+     * Gets the adapter factory service
+     *
+     * @return the adapter factory service
+     */
+    IAdapterFactoryService getAdapterFactoryService();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
similarity index 82%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
index 40bc7d8..e2e7e3e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+package org.apache.asterix.common.external;
 
 import java.io.Serializable;
 import java.util.Map;
 
-import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -60,8 +59,8 @@ public interface IAdapterFactory extends Serializable {
     /**
      * Creates an instance of IDatasourceAdapter.
      *
-     * @param HyracksTaskContext
-     * @param partition
+     * @param ctx HyracksTaskContext
+     * @param partition partition number
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
@@ -77,28 +76,4 @@ public interface IAdapterFactory extends Serializable {
      */
     void configure(IServiceContext serviceContext, Map<String, String> configuration)
             throws HyracksDataException, AlgebricksException;
-
-    /**
-     * Set the expected record output type of the adapter
-     *
-     * @param outputType
-     */
-    void setOutputType(ARecordType outputType);
-
-    /**
-     * Set the expected meta output type of the adapter
-     *
-     * @param metaType
-     */
-    void setMetaType(ARecordType metaType);
-
-    /**
-     * @return the adapter record output type
-     */
-    ARecordType getOutputType();
-
-    /**
-     * @return the adapter meta output type
-     */
-    ARecordType getMetaType();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java
similarity index 75%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java
index 37cc1cf..55e25b7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java
@@ -16,12 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+package org.apache.asterix.common.external;
 
-import java.util.List;
+@FunctionalInterface
+public interface IAdapterFactoryService {
 
-import org.apache.asterix.external.indexing.ExternalFile;
-
-public interface IIndexingAdapterFactory extends IAdapterFactory {
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
+    /**
+     * Creates and returns and adapter factory
+     *
+     * @return adapter factory
+     */
+    IAdapterFactory createAdapterFactory();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
similarity index 97%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
index 472cdae..18f59f2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+package org.apache.asterix.common.external;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 605fbe5..30e7770 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -435,5 +435,17 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>s3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>regions</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>auth</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java
index 37cc1cf..aaf2002 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java
@@ -16,12 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+package org.apache.asterix.external.adapter.factory;
 
-import java.util.List;
+import org.apache.asterix.common.external.IAdapterFactoryService;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 
-import org.apache.asterix.external.indexing.ExternalFile;
+public class AdapterFactoryService implements IAdapterFactoryService {
 
-public interface IIndexingAdapterFactory extends IAdapterFactory {
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
+    /**
+     * Creates and returns an adapter factory
+     *
+     * @return adaptor factory
+     */
+    @Override
+    public ITypedAdapterFactory createAdapterFactory() {
+        return new GenericAdapterFactory();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index fc59f68..d081e56 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,14 +25,14 @@ import java.util.Map;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.dataset.adapter.GenericAdapter;
@@ -59,7 +59,7 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
+public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAdapterFactory {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = LogManager.getLogger();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
index 37cc1cf..8d42046 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
@@ -22,6 +22,6 @@ import java.util.List;
 
 import org.apache.asterix.external.indexing.ExternalFile;
 
-public interface IIndexingAdapterFactory extends IAdapterFactory {
+public interface IIndexingAdapterFactory extends ITypedAdapterFactory {
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java
new file mode 100644
index 0000000..13e3b34
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.asterix.common.external.IAdapterFactory;
+import org.apache.asterix.om.types.ARecordType;
+
+/**
+ * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
+ * Acts as a marker interface indicating that the implementation provides functionality
+ * for creating an adapter.
+ */
+public interface ITypedAdapterFactory extends IAdapterFactory {
+
+    /**
+     * Set the expected record output type of the adapter
+     *
+     * @param outputType
+     */
+    void setOutputType(ARecordType outputType);
+
+    /**
+     * Set the expected meta output type of the adapter
+     *
+     * @param metaType
+     */
+    void setMetaType(ARecordType metaType);
+
+    /**
+     * @return the adapter record output type
+     */
+    ARecordType getOutputType();
+
+    /**
+     * @return the adapter meta output type
+     */
+    ARecordType getMetaType();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 2a92d40..0ab59fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -21,7 +21,7 @@ package org.apache.asterix.external.dataset.adapter;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 916fe0a..0904384 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
new file mode 100644
index 0000000..cfa1f6a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+
+public class AwsS3InputStream extends AsterixInputStream {
+
+    // Configuration
+    private final Map<String, String> configuration;
+
+    private final S3Client s3Client;
+
+    // File fields
+    private final List<String> filePaths;
+    private int nextFileIndex = 0;
+
+    // File reading fields
+    private InputStream inputStream;
+
+    public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) {
+        this.configuration = configuration;
+        this.filePaths = filePaths;
+
+        this.s3Client = buildAwsS3Client(configuration);
+    }
+
+    @Override
+    public int read() throws IOException {
+        throw new HyracksDataException(
+                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (inputStream == null) {
+            if (!advance()) {
+                return -1;
+            }
+        }
+
+        int result = inputStream.read(b, off, len);
+
+        // If file reading is done, go to the next file, or finish up if no files are left
+        if (result < 0) {
+            if (advance()) {
+                result = inputStream.read(b, off, len);
+            } else {
+                return -1;
+            }
+        }
+
+        return result;
+    }
+
+    private boolean advance() throws IOException {
+        // No files to read for this partition
+        if (filePaths == null || filePaths.isEmpty()) {
+            return false;
+        }
+
+        // Finished reading all the files
+        if (nextFileIndex == filePaths.size()) {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+            return false;
+        }
+
+        // Close the current stream before going to the next one
+        if (inputStream != null) {
+            inputStream.close();
+        }
+
+        String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+        GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
+        GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
+        inputStream = s3Client.getObject(getObjectRequest);
+
+        // Current file ready, point to the next file
+        nextFileIndex++;
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (inputStream != null) {
+            CleanupUtils.close(inputStream, null);
+        }
+    }
+
+    /**
+     * Prepares and builds the Amazon S3 client with the provided configuration
+     *
+     * @param configuration S3 client configuration
+     *
+     * @return Amazon S3 client
+     */
+    private static S3Client buildAwsS3Client(Map<String, String> configuration) {
+        S3ClientBuilder builder = S3Client.builder();
+
+        // Credentials
+        String accessKey = configuration.get(AwsS3Constants.ACCESS_KEY_FIELD_NAME);
+        String secretKey = configuration.get(AwsS3Constants.SECRET_KEY_FIELD_NAME);
+        AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey);
+        builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
+
+        // Region
+        String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
+        builder.region(Region.of(region));
+
+        // Use user's endpoint if provided
+        if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
+            String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
+            builder.endpointOverride(URI.create(endPoint));
+        }
+
+        return builder.build();
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
new file mode 100644
index 0000000..a9f7898
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3InputStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, String> configuration;
+
+    // Files to read from
+    private List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+
+    private transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) {
+        return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return partitionConstraint;
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration)
+            throws AlgebricksException, HyracksDataException {
+        this.configuration = configuration;
+        ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+
+        String container = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+
+        S3Client s3Client = buildAwsS3Client(configuration);
+
+        // Get all objects in a bucket and extract the paths to files
+        ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
+        String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME);
+        if (path != null) {
+            listObjectsBuilder.prefix(path + (path.endsWith("/") ? "" : "/"));
+        }
+        ListObjectsResponse listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
+        List<S3Object> s3Objects = listObjectsResponse.contents();
+
+        // Exclude the directories and get the files only
+        String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        List<S3Object> fileObjects = getFilesOnly(s3Objects, fileFormat);
+
+        // Partition constraints
+        partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
+        int partitionsCount = partitionConstraint.getLocations().length;
+
+        // Distribute work load amongst the partitions
+        distributeWorkLoad(fileObjects, partitionsCount);
+    }
+
+    /**
+     * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
+     * a file if it does not end up with a "/" which is the separator in a folder structure.
+     *
+     * @param s3Objects List of returned objects
+     *
+     * @return A list of string paths that point to files only
+     *
+     * @throws HyracksDataException HyracksDataException
+     */
+    private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws HyracksDataException {
+        List<S3Object> filesOnly = new ArrayList<>();
+        String fileExtension = getFileExtension(fileFormat);
+        if (fileExtension == null) {
+            throw HyracksDataException.create(ErrorCode.INVALID_FORMAT);
+        }
+
+        s3Objects.stream().filter(object -> object.key().endsWith(fileExtension)).forEach(filesOnly::add);
+
+        return filesOnly;
+    }
+
+    /**
+     * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
+     * size.
+     *
+     * Example:
+     * File1 1mb, File2 300kb, File3 300kb, File4 300kb
+     *
+     * Distribution:
+     * Partition1: [File1]
+     * Partition2: [File2, File3, File4]
+     *
+     * @param fileObjects AWS S3 file objects
+     * @param partitionsCount Partitions count
+     */
+    private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) {
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+        }
+
+        for (S3Object object : fileObjects) {
+            PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
+            smallest.addFilePath(object.key(), object.size());
+        }
+    }
+
+    /**
+     * Finds the smallest workload and returns it
+     *
+     * @return the smallest workload
+     */
+    private PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
+        PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
+        for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
+            // If the current total size is 0, add the file directly as this is a first time partition
+            if (partition.getTotalSize() == 0) {
+                smallest = partition;
+                break;
+            }
+            if (partition.getTotalSize() < smallest.getTotalSize()) {
+                smallest = partition;
+            }
+        }
+
+        return smallest;
+    }
+
+    /**
+     * Prepares and builds the Amazon S3 client with the provided configuration
+     *
+     * @param configuration S3 client configuration
+     *
+     * @return Amazon S3 client
+     */
+    private static S3Client buildAwsS3Client(Map<String, String> configuration) {
+        S3ClientBuilder builder = S3Client.builder();
+
+        // Credentials
+        String accessKey = configuration.get(AwsS3Constants.ACCESS_KEY_FIELD_NAME);
+        String secretKey = configuration.get(AwsS3Constants.SECRET_KEY_FIELD_NAME);
+        AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey);
+        builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
+
+        // Region
+        String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
+        builder.region(Region.of(region));
+
+        // Use user's endpoint if provided
+        if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
+            String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
+            builder.endpointOverride(URI.create(endPoint));
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Returns the file extension for the provided file format.
+     *
+     * @param format file format
+     *
+     * @return file extension for the provided file format, null otherwise.
+     */
+    private String getFileExtension(String format) {
+        switch (format.toLowerCase()) {
+            case "json":
+                return ".json";
+            default:
+                return null;
+        }
+    }
+
+    private static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private List<String> filePaths = new ArrayList<>();
+        private long totalSize = 0;
+
+        PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<String> getFilePaths() {
+            return filePaths;
+        }
+
+        public void addFilePath(String filePath, long size) {
+            this.filePaths.add(filePath);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "Files: " + filePaths.size() + ", Total Size: " + totalSize;
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
new file mode 100644
index 0000000..e78783a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AwsS3ReaderFactory extends StreamRecordReaderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final List<String> recordReaderNames =
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+        return streamFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration)
+            throws AlgebricksException, HyracksDataException {
+        this.configuration = configuration;
+
+        // Stream factory
+        streamFactory = new AwsS3InputStreamFactory();
+        streamFactory.configure(ctx, configuration);
+
+        // record reader
+        recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
+    }
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            StreamRecordReader streamRecordReader =
+                    (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
+            streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+            return streamRecordReader;
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
+                | NoSuchMethodException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 081d49e..4fd5151 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.operators;
 
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -37,10 +37,10 @@ public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperat
 
     private static final long serialVersionUID = 1L;
 
-    private IAdapterFactory adapterFactory;
+    private ITypedAdapterFactory adapterFactory;
 
     public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
-            IAdapterFactory dataSourceAdapterFactory) {
+            ITypedAdapterFactory dataSourceAdapterFactory) {
         super(spec, 0, 1);
         outRecDescs[0] = rDesc;
         this.adapterFactory = dataSourceAdapterFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 7a0341a..d63e8a8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -25,7 +25,7 @@ import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.om.types.ARecordType;
@@ -57,7 +57,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
     private final FeedPolicyAccessor policyAccessor;
     private final ARecordType adapterOutputType;
     /** The adaptor factory that is used to create an instance of the feed adaptor **/
-    private IAdapterFactory adaptorFactory;
+    private ITypedAdapterFactory adaptorFactory;
     /** The library that contains the adapter in use. **/
     private String adaptorLibraryName;
     /**
@@ -68,7 +68,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
     /** The configuration parameters associated with the adapter. **/
     private Map<String, String> adaptorConfiguration;
 
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory,
             ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
         super(spec, 0, 1);
         this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName());
@@ -100,15 +100,15 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
         return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, recordDescProvider, this);
     }
 
-    private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
-        IAdapterFactory adapterFactory;
+    private ITypedAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
+        ITypedAdapterFactory adapterFactory;
         INcApplicationContext runtimeCtx =
                 (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
         ILibraryManager libraryManager = runtimeCtx.getLibraryManager();
         ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName);
         if (classLoader != null) {
             try {
-                adapterFactory = (IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
+                adapterFactory = (ITypedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
                 adapterFactory.setOutputType(adapterOutputType);
                 adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration);
             } catch (Exception e) {
@@ -128,11 +128,11 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
         return feedId;
     }
 
-    public IAdapterFactory getAdaptorFactory() {
+    public ITypedAdapterFactory getAdaptorFactory() {
         return this.adaptorFactory;
     }
 
-    public void setAdaptorFactory(IAdapterFactory factory) {
+    public void setAdaptorFactory(ITypedAdapterFactory factory) {
         this.adaptorFactory = factory;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 98f75df..7002a23 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -50,7 +50,7 @@ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePush
     private final FeedAdapter adapter;
     private boolean poisoned = false;
 
-    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
+    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, ITypedAdapterFactory adapterFactory,
             int partition, IRecordDescriptorProvider recordDescProvider,
             FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException {
         super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 5740143..414c460 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,10 +21,11 @@ package org.apache.asterix.external.provider;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
 import org.apache.asterix.om.types.ARecordType;
@@ -39,11 +40,13 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class AdapterFactoryProvider {
 
     // Adapters
-    public static IAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
+    public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
             Map<String, String> configuration, ARecordType itemType, ARecordType metaType)
             throws HyracksDataException, AlgebricksException {
         ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
-        GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
+        ICcApplicationContext context = (ICcApplicationContext) serviceCtx.getApplicationContext();
+        ITypedAdapterFactory adapterFactory =
+                (ITypedAdapterFactory) context.getAdapterFactoryService().createAdapterFactory();
         adapterFactory.setOutputType(itemType);
         adapterFactory.setMetaType(metaType);
         adapterFactory.configure(serviceCtx, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 8024dc4..2a2289c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -53,6 +53,7 @@ public class DatasourceFactoryProvider {
 
     public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager,
             Map<String, String> configuration) throws HyracksDataException, AsterixException {
+        // Take a copy of the configuration
         if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
             String reader = configuration.get(ExternalDataConstants.KEY_READER);
             return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
index e222e99..77cbb96 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -60,8 +60,11 @@ public class ExternalDataCompatibilityUtils {
     }
 
     public static void prepare(String adapterName, Map<String, String> configuration) {
-        if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
-            configuration.put(ExternalDataConstants.KEY_READER, adapterName);
+        // Adapter name in some cases can carry the link name for external datasets, always add it to configuration
+        configuration.put(ExternalDataConstants.KEY_LINK_NAME, adapterName);
+
+        if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) { // SThree
+            configuration.put(ExternalDataConstants.KEY_READER, adapterName); // myAwsLink
         }
         if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)) {
             if (configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 729215e..e44144a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -114,6 +114,7 @@ public class ExternalDataConstants {
     public static final String KEY_ADAPTER_NAME_SOCKET = "socket";
     public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter";
     public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
+    public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3";
 
     /**
      * HDFS class names
@@ -229,4 +230,26 @@ public class ExternalDataConstants {
     public static final String FORMAT_CSV = "csv";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
+
+    // TODO(Hussain): Move link related items to a different place
+    /**
+     * Common external link fields
+     */
+    public static final String KEY_DATAVERSE_NAME = "dataverseName";
+    public static final String KEY_LINK_NAME = "linkName";
+    public static final String KEY_LINK_TYPE = "linkType";
+    public static final String[] KEY_EXTERNAL_DATASET_REQUIRED_CONNECTION_PARAMETERS =
+            new String[] { KEY_DATAVERSE_NAME, KEY_LINK_NAME, KEY_LINK_TYPE };
+
+    public static class AwsS3Constants {
+        public static final String REGION_FIELD_NAME = "region";
+        public static final String ACCESS_KEY_FIELD_NAME = "accessKey";
+        public static final String SECRET_KEY_FIELD_NAME = "secretKey";
+        public static final String CONTAINER_NAME_FIELD_NAME = "container";
+        public static final String DEFINITION_FIELD_NAME = "definition";
+        public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+        public static final String[] REQUIRED_LINK_PARAMETERS =
+                new String[] { ACCESS_KEY_FIELD_NAME, SECRET_KEY_FIELD_NAME, REGION_FIELD_NAME };
+        public static final String[] OPTIONAL_LINK_PARAMETERS = new String[] { SERVICE_END_POINT_FIELD_NAME };
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 0d96658..fd3e473 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -20,3 +20,4 @@ org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
 org.apache.asterix.external.input.HDFSDataSourceFactory
 org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
 org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
+org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 8ee8a57..a947c7e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -25,9 +25,9 @@ import java.util.Map;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.dataflow.TupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.om.types.ARecordType;
@@ -41,7 +41,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.std.file.ITupleParser;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapterFactory implements IAdapterFactory {
+public class TestTypedAdapterFactory implements ITypedAdapterFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 0a17b24..45fc33a 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -44,7 +44,7 @@ public class DatasetDecl extends AbstractStatement {
     protected final DatasetType datasetType;
     protected final IDatasetDetailsDecl datasetDetailsDecl;
     protected final Map<String, String> hints;
-    private final AdmObjectNode withObjectNode;
+    private AdmObjectNode withObjectNode;
     protected final boolean ifNotExists;
 
     public DatasetDecl(Identifier dataverse, Identifier name, Identifier itemTypeDataverse, Identifier itemTypeName,
@@ -67,7 +67,7 @@ public class DatasetDecl extends AbstractStatement {
         }
         this.nodegroupName = nodeGroupName;
         this.hints = hints;
-        this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord);
+        this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord, datasetType);
         this.ifNotExists = ifNotExists;
         this.datasetType = datasetType;
         this.datasetDetailsDecl = idd;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
index a26a638..52285d9 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.lang.common.util;
 
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
 import org.apache.asterix.object.base.AdmObjectNode;
@@ -60,14 +61,21 @@ public class DatasetDeclParametersUtil {
     private DatasetDeclParametersUtil() {
     }
 
-    public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord) throws CompilationException {
+    public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord,
+            DatasetConfig.DatasetType datasetType) throws CompilationException {
         if (withRecord == null) {
             return EMPTY_WITH_OBJECT;
         }
-        final ConfigurationTypeValidator validator = new ConfigurationTypeValidator();
-        final AdmObjectNode node = ExpressionUtils.toNode(withRecord);
-        validator.validateType(WITH_OBJECT_TYPE, node);
-        return node;
+
+        // Handle based on dataset type
+        if (datasetType == DatasetConfig.DatasetType.INTERNAL) {
+            final ConfigurationTypeValidator validator = new ConfigurationTypeValidator();
+            final AdmObjectNode node = ExpressionUtils.toNode(withRecord);
+            validator.validateType(WITH_OBJECT_TYPE, node);
+            return node;
+        } else {
+            return ExpressionUtils.toNode(withRecord);
+        }
     }
 
     private static ARecordType getWithObjectType() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 49fffe6..3412941 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -35,13 +35,13 @@ import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.MetadataException;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.IDatasetDetails;
@@ -294,7 +294,8 @@ public class MetadataBootstrap {
 
     private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws AlgebricksException {
         try {
-            String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
+            String adapterName =
+                    ((ITypedAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
             return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
                     adapterFactoryClassName, IDataSourceAdapter.AdapterType.INTERNAL);
         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 62cce05..07bbc57 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -111,7 +111,7 @@ public class DatasetDataSource extends DataSource {
                         externalDataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
 
                 ExternalDatasetDetails edd = (ExternalDatasetDetails) externalDataset.getDatasetDetails();
-                IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
+                ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
                         edd.getAdapter(), edd.getProperties(), (ARecordType) itemType, null);
                 return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
             case INTERNAL:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 3460a46..c2983af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
@@ -137,7 +137,7 @@ public class LoadableDataSource extends DataSource {
         }
         LoadableDataSource alds = (LoadableDataSource) dataSource;
         ARecordType itemType = (ARecordType) alds.getLoadedType();
-        IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
+        ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
                 alds.getAdapter(), alds.getAdapterProperties(), itemType, null);
         RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
         return metadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0a72ceb..5bdf2a7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -39,6 +39,7 @@ import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.metadata.LockList;
 import org.apache.asterix.common.storage.ICompressionManager;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
@@ -48,8 +49,7 @@ import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
@@ -416,7 +416,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
-            JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
+            JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, RecordDescriptor rDesc)
             throws AlgebricksException {
         ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
         try {
@@ -430,14 +430,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
     }
 
-    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
+    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
+        Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
         factoryOutput =
                 FeedMetadataUtil.getFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx, getApplicationContext());
         ARecordType recordType =
                 FeedMetadataUtil.getOutputType(feed, feed.getConfiguration().get(ExternalDataConstants.KEY_TYPE_NAME));
-        IAdapterFactory adapterFactory = factoryOutput.first;
+        ITypedAdapterFactory adapterFactory = factoryOutput.first;
         FeedIntakeOperatorDescriptor feedIngestor = null;
         switch (factoryOutput.third) {
             case INTERNAL:
@@ -775,11 +775,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return numElementsHint / numPartitions;
     }
 
-    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+    protected ITypedAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
             Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AlgebricksException {
         try {
             configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
-            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(
+            ITypedAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(
                     getApplicationContext().getServiceContext(), adapterName, configuration, itemType, metaType);
 
             // check to see if dataset is indexed
@@ -922,7 +922,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory) throws AlgebricksException {
+            JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
             throw new AlgebricksException("Can only scan datasets of records.");
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index b72c058..c29fb93 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.metadata.entities;
 
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.api.IMetadataEntity;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 8f630cf..9e65c08 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -24,7 +24,7 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.util.Calendar;
 
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 3ae0fec..7ed53e4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -28,9 +28,9 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.MetadataException;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
@@ -117,20 +117,20 @@ public class FeedMetadataUtil {
                 adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
             }
             AdapterType adapterType;
-            IAdapterFactory adapterFactory;
+            ITypedAdapterFactory adapterFactory;
             if (adapterEntity != null) {
                 adapterType = adapterEntity.getType();
                 String adapterFactoryClassname = adapterEntity.getClassname();
                 switch (adapterType) {
                     case INTERNAL:
-                        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                        adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
                         break;
                     case EXTERNAL:
                         String[] anameComponents = adapterName.split("#");
                         String libraryName = anameComponents[0];
                         ClassLoader cl =
                                 appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName);
-                        adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
                     default:
                         throw new AsterixException("Unknown Adapter type " + adapterType);
@@ -165,17 +165,17 @@ public class FeedMetadataUtil {
     }
 
     @SuppressWarnings("rawtypes")
-    public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed,
+    public static Triple<ITypedAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed,
             FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx)
             throws AlgebricksException {
         // This method needs to be re-visited
         String adapterName = null;
         DatasourceAdapter adapterEntity = null;
         String adapterFactoryClassname = null;
-        IAdapterFactory adapterFactory = null;
+        ITypedAdapterFactory adapterFactory = null;
         ARecordType adapterOutputType = null;
         ARecordType metaType = null;
-        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null;
+        Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null;
         IDataSourceAdapter.AdapterType adapterType = null;
         try {
             Map<String, String> configuration = feed.getConfiguration();
@@ -196,14 +196,14 @@ public class FeedMetadataUtil {
                 adapterFactoryClassname = adapterEntity.getClassname();
                 switch (adapterType) {
                     case INTERNAL:
-                        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                        adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
                         break;
                     case EXTERNAL:
                         String[] anameComponents = adapterName.split("#");
                         String libraryName = anameComponents[0];
                         ClassLoader cl =
                                 appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName);
-                        adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
                     default:
                         throw new AsterixException("Unknown Adapter type " + adapterType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 47db3b0..c1d8f42 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -30,7 +30,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
 import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
@@ -254,7 +254,7 @@ public class ExternalIndexingOperations {
             throws HyracksDataException, AlgebricksException {
         ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
         Map<String, String> configuration = externalDatasetDetails.getProperties();
-        IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
+        ITypedAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
                 metadataProvider.getApplicationContext().getServiceContext(), externalDatasetDetails.getAdapter(),
                 configuration, (ARecordType) itemType, files, true, null);
         ExternalScanOperatorDescriptor scanOp =
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index a0b10c6..3366ac1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -45,6 +45,7 @@ import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IAdapterFactoryService;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -98,14 +99,15 @@ public class CcApplicationContext implements ICcApplicationContext {
     private final IReceptionist receptionist;
     private final IRequestTracker requestTracker;
     private final IConfigValidator configValidator;
+    private final IAdapterFactoryService adapterFactoryService;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
             IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
             IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
             IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory,
-            IConfigValidatorFactory configValidatorFactory, Object extensionManager)
-            throws AlgebricksException, IOException {
+            IConfigValidatorFactory configValidatorFactory, Object extensionManager,
+            IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
@@ -139,6 +141,7 @@ public class CcApplicationContext implements ICcApplicationContext {
         receptionist = receptionistFactory.create();
         requestTracker = new RequestTracker(this);
         configValidator = configValidatorFactory.create();
+        this.adapterFactoryService = adapterFactoryService;
     }
 
     @Override
@@ -306,4 +309,9 @@ public class CcApplicationContext implements ICcApplicationContext {
     public IRequestTracker getRequestTracker() {
         return requestTracker;
     }
+
+    @Override
+    public IAdapterFactoryService getAdapterFactoryService() {
+        return adapterFactoryService;
+    }
 }
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 409b13d..7acfc04 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -172,6 +172,11 @@
               <gav>io.netty:netty-all:4.1.46.Final</gav>
               <noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.46.Final/NOTICE.txt</noticeUrl>
             </override>
+            <override>
+              <gav>org.reactivestreams:reactive-streams:1.0.2</gav>
+              <noticeUrl>https://raw.githubusercontent.com/reactive-streams/reactive-streams-jvm/v1.0.2/COPYING.txt</noticeUrl>
+              <url>https://raw.githubusercontent.com/reactive-streams/reactive-streams-jvm/v1.0.2/LICENSE.txt</url>
+            </override>
           </overrides>
           <licenses>
             <license>
@@ -205,6 +210,7 @@
                 <aliasUrl>http://www.apache.org/licenses/LICENSE-2.0</aliasUrl>
                 <aliasUrl>https://www.apache.org/licenses/LICENSE-2.0.txt</aliasUrl>
                 <aliasUrl>http://www.apache.org/licenses/LICENSE-2.0.html</aliasUrl>
+                <aliasUrl>https://aws.amazon.com/apache2.0</aliasUrl>
               </aliasUrls>
               <metric>1</metric>
             </license>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 25550bf..76ec3d2 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -81,6 +81,7 @@
     <hyracks.version>0.3.5-SNAPSHOT</hyracks.version>
     <hadoop.version>2.8.5</hadoop.version>
     <jacoco.version>0.7.6.201602180812</jacoco.version>
+    <awsjavasdk.version>2.10.83</awsjavasdk.version>
 
     <implementation.title>Apache AsterixDB - ${project.name}</implementation.title>
     <implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -1340,6 +1341,84 @@
         <artifactId>reflections</artifactId>
         <version>0.9.12</version>
       </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>s3</artifactId>
+        <version>${awsjavasdk.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http2</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>regions</artifactId>
+        <version>${awsjavasdk.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>auth</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>sdk-core</artifactId>
+        <version>${awsjavasdk.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <!-- Mock for AWS S3 -->
+      <dependency>
+        <groupId>io.findify</groupId>
+        <artifactId>s3mock_2.12</artifactId>
+        <version>0.2.5</version>
+      </dependency>
+      <!-- Needed for the s3 mock -->
+      <dependency>
+        <groupId>com.typesafe.akka</groupId>
+        <artifactId>akka-http-core_2.12</artifactId>
+        <version>10.1.0</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt
new file mode 100644
index 0000000..1625c17
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt
@@ -0,0 +1,121 @@
+Creative Commons Legal Code
+
+CC0 1.0 Universal
+
+    CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
+    LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
+    ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
+    INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
+    REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
+    PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
+    THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
+    HEREUNDER.
+
+Statement of Purpose
+
+The laws of most jurisdictions throughout the world automatically confer
+exclusive Copyright and Related Rights (defined below) upon the creator
+and subsequent owner(s) (each and all, an "owner") of an original work of
+authorship and/or a database (each, a "Work").
+
+Certain owners wish to permanently relinquish those rights to a Work for
+the purpose of contributing to a commons of creative, cultural and
+scientific works ("Commons") that the public can reliably and without fear
+of later claims of infringement build upon, modify, incorporate in other
+works, reuse and redistribute as freely as possible in any form whatsoever
+and for any purposes, including without limitation commercial purposes.
+These owners may contribute to the Commons to promote the ideal of a free
+culture and the further production of creative, cultural and scientific
+works, or to gain reputation or greater distribution for their Work in
+part through the use and efforts of others.
+
+For these and/or other purposes and motivations, and without any
+expectation of additional consideration or compensation, the person
+associating CC0 with a Work (the "Affirmer"), to the extent that he or she
+is an owner of Copyright and Related Rights in the Work, voluntarily
+elects to apply CC0 to the Work and publicly distribute the Work under its
+terms, with knowledge of his or her Copyright and Related Rights in the
+Work and the meaning and intended legal effect of CC0 on those rights.
+
+1. Copyright and Related Rights. A Work made available under CC0 may be
+protected by copyright and related or neighboring rights ("Copyright and
+Related Rights"). Copyright and Related Rights include, but are not
+limited to, the following:
+
+  i. the right to reproduce, adapt, distribute, perform, display,
+     communicate, and translate a Work;
+ ii. moral rights retained by the original author(s) and/or performer(s);
+iii. publicity and privacy rights pertaining to a person's image or
+     likeness depicted in a Work;
+ iv. rights protecting against unfair competition in regards to a Work,
+     subject to the limitations in paragraph 4(a), below;
+  v. rights protecting the extraction, dissemination, use and reuse of data
+     in a Work;
+ vi. database rights (such as those arising under Directive 96/9/EC of the
+     European Parliament and of the Council of 11 March 1996 on the legal
+     protection of databases, and under any national implementation
+     thereof, including any amended or successor version of such
+     directive); and
+vii. other similar, equivalent or corresponding rights throughout the
+     world based on applicable law or treaty, and any national
+     implementations thereof.
+
+2. Waiver. To the greatest extent permitted by, but not in contravention
+of, applicable law, Affirmer hereby overtly, fully, permanently,
+irrevocably and unconditionally waives, abandons, and surrenders all of
+Affirmer's Copyright and Related Rights and associated claims and causes
+of action, whether now known or unknown (including existing as well as
+future claims and causes of action), in the Work (i) in all territories
+worldwide, (ii) for the maximum duration provided by applicable law or
+treaty (including future time extensions), (iii) in any current or future
+medium and for any number of copies, and (iv) for any purpose whatsoever,
+including without limitation commercial, advertising or promotional
+purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
+member of the public at large and to the detriment of Affirmer's heirs and
+successors, fully intending that such Waiver shall not be subject to
+revocation, rescission, cancellation, termination, or any other legal or
+equitable action to disrupt the quiet enjoyment of the Work by the public
+as contemplated by Affirmer's express Statement of Purpose.
+
+3. Public License Fallback. Should any part of the Waiver for any reason
+be judged legally invalid or ineffective under applicable law, then the
+Waiver shall be preserved to the maximum extent permitted taking into
+account Affirmer's express Statement of Purpose. In addition, to the
+extent the Waiver is so judged Affirmer hereby grants to each affected
+person a royalty-free, non transferable, non sublicensable, non exclusive,
+irrevocable and unconditional license to exercise Affirmer's Copyright and
+Related Rights in the Work (i) in all territories worldwide, (ii) for the
+maximum duration provided by applicable law or treaty (including future
+time extensions), (iii) in any current or future medium and for any number
+of copies, and (iv) for any purpose whatsoever, including without
+limitation commercial, advertising or promotional purposes (the
+"License"). The License shall be deemed effective as of the date CC0 was
+applied by Affirmer to the Work. Should any part of the License for any
+reason be judged legally invalid or ineffective under applicable law, such
+partial invalidity or ineffectiveness shall not invalidate the remainder
+of the License, and in such case Affirmer hereby affirms that he or she
+will not (i) exercise any of his or her remaining Copyright and Related
+Rights in the Work or (ii) assert any associated claims and causes of
+action with respect to the Work, in either case contrary to Affirmer's
+express Statement of Purpose.
+
+4. Limitations and Disclaimers.
+
+ a. No trademark or patent rights held by Affirmer are waived, abandoned,
+    surrendered, licensed or otherwise affected by this document.
+ b. Affirmer offers the Work as-is and makes no representations or
+    warranties of any kind concerning the Work, express, implied,
+    statutory or otherwise, including without limitation warranties of
+    title, merchantability, fitness for a particular purpose, non
+    infringement, or the absence of latent or other defects, accuracy, or
+    the present or absence of errors, whether or not discoverable, all to
+    the greatest extent permissible under applicable law.
+ c. Affirmer disclaims responsibility for clearing rights of other persons
+    that may apply to the Work or any use thereof, including without
+    limitation any person's Copyright and Related Rights in the Work.
+    Further, Affirmer disclaims responsibility for obtaining any necessary
+    consents, permissions or other rights required for any use of the
+    Work.
+ d. Affirmer understands and acknowledges that Creative Commons is not a
+    party to this document and has no duty or obligation with respect to
+    this CC0 or use of the Work.
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt
new file mode 100644
index 0000000..eadae05
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt
@@ -0,0 +1,8 @@
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this code has waived all copyright and related or neighboring
+rights to this code.
+
+You should have received a copy of the CC0 legalcode along with this
+work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
\ No newline at end of file