You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2023/03/30 20:05:25 UTC

[asterixdb] branch master updated: [ASTERIXDB-3158][HYR]: Make IOManager support writing to cloud storage

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c9678e4b45 [ASTERIXDB-3158][HYR]: Make IOManager support writing to cloud storage
c9678e4b45 is described below

commit c9678e4b45205791dbdd0fb9b20927e995a7ccf2
Author: Hussain Towaileb <Hu...@couchbase.com>
AuthorDate: Tue Mar 28 21:51:07 2023 +0300

    [ASTERIXDB-3158][HYR]: Make IOManager support writing to cloud storage
    
    Change-Id: I362a3cbfcd3fa99f321467cb72d74d388fcdee2b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17453
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../java/org/apache/hyracks/api/io/IIOManager.java |  61 +++--
 .../java/org/apache/hyracks/api/util/IoUtil.java   |   5 +
 .../hyracks-control/hyracks-control-nc/pom.xml     |  31 +++
 .../apache/hyracks/control/nc/io/FileHandle.java   |   2 +-
 .../apache/hyracks/control/nc/io/IOManager.java    | 128 ++++++++-
 .../control/nc/io/cloud/CloudFileHandle.java       |  49 ++++
 .../control/nc/io/cloud/CloudIOManager.java        | 289 +++++++++++++++++++++
 .../nc/io/cloud/CloudResettableInputStream.java    | 153 +++++++++++
 .../control/nc/io/cloud/LocalCacheUtil.java        | 102 ++++++++
 .../control/nc/io/cloud/WriteBufferProvider.java   |  45 ++++
 .../nc/io/cloud/clients/CloudClientProvider.java   |  56 ++++
 .../nc/io/cloud/clients/ICloudBufferedWriter.java  |  49 ++++
 .../control/nc/io/cloud/clients/ICloudClient.java  | 128 +++++++++
 .../nc/io/cloud/clients/NoOpCloudClient.java       |  86 ++++++
 .../io/cloud/clients/aws/s3/S3BufferedWriter.java  | 121 +++++++++
 .../nc/io/cloud/clients/aws/s3/S3CloudClient.java  | 244 +++++++++++++++++
 .../nc/io/cloud/clients/aws/s3/S3Utils.java        |  75 ++++++
 .../clients/azure/blob/AzureBlobCloudClient.java   |  86 ++++++
 .../io/cloud/clients/gcp/gcs/GCSCloudClient.java   |  86 ++++++
 .../org/apache/hyracks/control/nc/lsm/LSMTest.java | 119 +++++++++
 .../hyracks/control/nc/lsm/aws/s3/LSMS3Test.java   |  93 +++++++
 hyracks-fullstack/pom.xml                          |  33 +++
 22 files changed, 2023 insertions(+), 18 deletions(-)

diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 75f484818b..c3ff70ac22 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -19,34 +19,39 @@
 package org.apache.hyracks.api.io;
 
 import java.io.Closeable;
+import java.io.FilenameFilter;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IIOManager extends Closeable {
-    public enum FileReadWriteMode {
+
+    enum FileReadWriteMode {
         READ_ONLY,
         READ_WRITE
     }
 
-    public enum FileSyncMode {
+    enum FileSyncMode {
         METADATA_SYNC_DATA_SYNC,
         METADATA_ASYNC_DATA_SYNC,
         METADATA_ASYNC_DATA_ASYNC
     }
 
-    public List<IODeviceHandle> getIODevices();
+    List<IODeviceHandle> getIODevices();
 
-    public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+    IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
             throws HyracksDataException;
 
-    public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+    int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
-    public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException;
+    long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException;
 
-    public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+    int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
     IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
@@ -54,17 +59,17 @@ public interface IIOManager extends Closeable {
 
     IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
-    public void close(IFileHandle fHandle) throws HyracksDataException;
+    void close(IFileHandle fHandle) throws HyracksDataException;
 
-    public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+    void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
 
-    public void truncate(IFileHandle fileHandle, long size) throws HyracksDataException;
+    void truncate(IFileHandle fileHandle, long size) throws HyracksDataException;
 
-    public long getSize(IFileHandle fileHandle);
+    long getSize(IFileHandle fileHandle);
 
-    public WritableByteChannel newWritableChannel(IFileHandle fileHandle);
+    WritableByteChannel newWritableChannel(IFileHandle fileHandle);
 
-    public void deleteWorkspaceFiles() throws HyracksDataException;
+    void deleteWorkspaceFiles() throws HyracksDataException;
 
     /**
      * @param ioDeviceId
@@ -85,11 +90,10 @@ public interface IIOManager extends Closeable {
     /**
      * Gets a file reference from an absolute path
      *
-     * @deprecated
-     *             use getFileRef(int ioDeviceId, String path) instead
      * @param path
      * @return A file reference based on the mounting point of {@code ioDeviceId} and the passed {@code relativePath}
      * @throws HyracksDataException
+     * @deprecated use getFileRef(int ioDeviceId, String path) instead
      */
     @Deprecated
     FileReference resolveAbsolutePath(String path) throws HyracksDataException;
@@ -109,4 +113,31 @@ public interface IIOManager extends Closeable {
      * @return the total disk usage in bytes
      */
     long getTotalDiskUsage();
+
+    /**
+     * Delete any additional artifacts associated with the file reference
+     *
+     * @param fileRef
+     */
+    void delete(FileReference fileRef) throws HyracksDataException;
+
+    Set<FileReference> list(FileReference dir) throws HyracksDataException;
+
+    Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException;
+
+    void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException;
+
+    byte[] readAllBytes(FileReference fileRef) throws HyracksDataException;
+
+    void copyDirectory(FileReference srcMetadataScopePath, FileReference targetMetadataScopePath)
+            throws HyracksDataException;
+
+    void deleteDirectory(FileReference root) throws HyracksDataException;
+
+    // TODO: Remove and use list
+    Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) throws HyracksDataException;
+
+    boolean exists(FileReference fileRef);
+
+    void create(FileReference fileRef) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index ae49cb6cb3..7644a306bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -46,6 +46,7 @@ import org.apache.logging.log4j.Logger;
 public class IoUtil {
 
     public static final String FILE_NOT_FOUND_MSG = "Deleting non-existing file!";
+    public static final FilenameFilter NO_OP_FILTER = (dir, name) -> true;
     private static final Logger LOGGER = LogManager.getLogger();
 
     private IoUtil() {
@@ -189,4 +190,8 @@ public class IoUtil {
             }
         }
     }
+
+    public static String getFileNameFromPath(String path) {
+        return path.substring(path.lastIndexOf('/') + 1);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d68c291554..223c71cedf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -95,5 +95,36 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>sdk-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>s3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>regions</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>auth</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.findify</groupId>
+      <artifactId>s3mock_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-http-core_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 57bda8b2f7..6c74838523 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -30,7 +30,7 @@ import org.apache.hyracks.api.io.IIOManager;
 
 public class FileHandle implements IFileHandle {
 
-    private final FileReference fileRef;
+    protected final FileReference fileRef;
     private RandomAccessFile raf;
     private String mode;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index dee61a465a..984ab08a6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -26,11 +26,15 @@ import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -68,11 +72,11 @@ public class IOManager implements IIOManager {
     private final BlockingQueue<IoRequest> freeRequests;
     private final List<IODeviceHandle> ioDevices;
     private final List<IODeviceHandle> workspaces;
+    private final IFileDeviceResolver deviceComputer;
     /*
      * Mutables
      */
     private int workspaceIndex;
-    private final IFileDeviceResolver deviceComputer;
 
     public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize)
             throws HyracksDataException {
@@ -103,6 +107,20 @@ public class IOManager implements IIOManager {
         }
     }
 
+    protected IOManager(IOManager ioManager, int queueSize, int ioParallelism) {
+        this.ioDevices = ioManager.ioDevices;
+        workspaces = ioManager.workspaces;
+        workspaceIndex = 0;
+        this.deviceComputer = ioManager.deviceComputer;
+        submittedRequests = new ArrayBlockingQueue<>(queueSize);
+        freeRequests = new ArrayBlockingQueue<>(queueSize);
+        int numIoThreads = ioDevices.size() * ioParallelism;
+        executor = Executors.newFixedThreadPool(numIoThreads);
+        for (int i = 0; i < numIoThreads; i++) {
+            executor.execute(new IoRequestHandler(i, submittedRequests));
+        }
+    }
+
     public IoRequest getOrAllocRequest() {
         IoRequest request = freeRequests.poll();
         if (request == null) {
@@ -242,7 +260,7 @@ public class IOManager implements IIOManager {
      * @param offset
      * @param data
      * @return The number of bytes read, possibly zero, or -1 if the given offset is greater than or equal to the file's
-     *         current size
+     * current size
      * @throws HyracksDataException
      */
     @Override
@@ -473,4 +491,110 @@ public class IOManager implements IIOManager {
             }
         };
     }
+
+    @Override
+    public void delete(FileReference fileRef) throws HyracksDataException {
+        if (fileRef.getFile().exists()) {
+            IoUtil.delete(fileRef);
+        }
+    }
+
+    @Override
+    public Set<FileReference> list(FileReference dir) throws HyracksDataException {
+        return list(dir, IoUtil.NO_OP_FILTER);
+    }
+
+    @Override
+    public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        /*
+         * Throws an error if this abstract pathname does not denote a directory, or if an I/O error occurs.
+         * Returns an empty set if the file does not exist, otherwise, returns the files in the specified directory
+         */
+        Set<FileReference> listedFiles = new HashSet<>();
+        if (!dir.getFile().exists()) {
+            return listedFiles;
+        }
+
+        String[] files = dir.getFile().list(filter);
+        if (files == null) {
+            if (!dir.getFile().canRead()) {
+                throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, dir);
+            } else if (!dir.getFile().isDirectory()) {
+                throw HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
+            }
+            throw HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
+        }
+
+        for (String file : files) {
+            listedFiles.add(dir.getChild(file));
+        }
+        return listedFiles;
+    }
+
+    @Override
+    public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
+        File file = fileRef.getFile();
+        try {
+            if (file.exists()) {
+                delete(fileRef);
+            } else {
+                FileUtils.createParentDirectories(file);
+            }
+            FileUtil.writeAndForce(file.toPath(), bytes);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
+        if (!fileRef.getFile().exists()) {
+            return null;
+        }
+        try {
+            return Files.readAllBytes(fileRef.getFile().toPath());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void deleteDirectory(FileReference root) throws HyracksDataException {
+        try {
+            FileUtils.deleteDirectory(root.getFile());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter)
+            throws HyracksDataException {
+        Collection<File> files = IoUtil.getMatchingFiles(root.getFile().toPath(), filter);
+        Set<FileReference> fileReferences = new HashSet<>();
+        for (File file : files) {
+            fileReferences.add(resolveAbsolutePath(file.getAbsolutePath()));
+        }
+
+        return fileReferences;
+    }
+
+    @Override
+    public boolean exists(FileReference fileRef) {
+        return fileRef.getFile().exists();
+    }
+
+    @Override
+    public void create(FileReference fileRef) throws HyracksDataException {
+        IoUtil.create(fileRef);
+    }
+
+    @Override
+    public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
+        try {
+            FileUtils.copyDirectory(srcFileRef.getFile(), destFileRef.getFile());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudFileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudFileHandle.java
new file mode 100644
index 0000000000..c7ed9c02c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudFileHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+public class CloudFileHandle extends FileHandle {
+    private final CloudResettableInputStream inputStream;
+
+    public CloudFileHandle(ICloudClient cloudClient, String bucket, FileReference fileRef,
+            WriteBufferProvider bufferProvider) {
+        super(fileRef);
+        ICloudBufferedWriter bufferedWriter = cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
+        inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
+    }
+
+    @Override
+    public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException {
+        if (fileRef.getFile().exists()) {
+            super.open(rwMode, syncMode);
+        }
+    }
+
+    public CloudResettableInputStream getInputStream() {
+        return inputStream;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudIOManager.java
new file mode 100644
index 0000000000..f753f4f1c0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudIOManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.control.nc.io.cloud.clients.CloudClientProvider;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CloudIOManager extends IOManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ICloudClient cloudClient;
+    private final WriteBufferProvider writeBufferProvider;
+    private final String bucket;
+
+    // TODO(htowaileb): temporary, will need to be read from somewhere
+    public static final String STORAGE_ROOT_DIR_NAME = "storage";
+
+    public CloudIOManager(IIOManager ioManager, int queueSize, int ioParallelism) throws HyracksDataException {
+        super((IOManager) ioManager, queueSize, ioParallelism);
+
+        // TODO(htowaileb): temporary, this needs to be provided somehow
+        try {
+            List<String> lines = FileUtils.readLines(new File("/etc/s3"), "UTF-8");
+            bucket = lines.get(0);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        cloudClient = CloudClientProvider.getClient(CloudClientProvider.ClientType.S3);
+        int numOfThreads = ioManager.getIODevices().size() * ioParallelism;
+        writeBufferProvider = new WriteBufferProvider(numOfThreads);
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    @Override
+    public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+        long writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
+        CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
+        try {
+            inputStream.write(dataArray[0], dataArray[1]);
+        } catch (HyracksDataException e) {
+            inputStream.abort();
+            throw e;
+        }
+
+        return writtenBytes;
+    }
+
+    @Override
+    public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
+        int writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
+        CloudResettableInputStream cloudInputStream = ((CloudFileHandle) fHandle).getInputStream();
+        try {
+            cloudInputStream.write(dataArray);
+        } catch (HyracksDataException e) {
+            cloudInputStream.abort();
+            throw e;
+        }
+
+        return writtenBytes;
+    }
+
+    @Override
+    public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+            throws HyracksDataException {
+        CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
+        if (!super.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
+            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+            try {
+                // TODO: We need a proper caching mechanism
+                LOGGER.info("Downloading {} from cloud storage..", fileRef.getRelativePath());
+                LocalCacheUtil.download(cloudClient, this, fHandle, rwMode, syncMode, writeBuffer);
+                super.close(fHandle);
+                LOGGER.info("Finished downloading {} from cloud storage..", fileRef.getRelativePath());
+            } finally {
+                writeBufferProvider.recycle(writeBuffer);
+            }
+        }
+
+        try {
+            fHandle.open(rwMode, syncMode);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return fHandle;
+    }
+
+    @Override
+    public void delete(FileReference fileRef) throws HyracksDataException {
+        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+            // Never delete the storage dir in cloud storage
+            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+        }
+        super.delete(fileRef);
+    }
+
+    @Override
+    public void close(IFileHandle fHandle) throws HyracksDataException {
+        try {
+            ((CloudFileHandle) fHandle).getInputStream().close();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        super.close(fHandle);
+    }
+
+    @Override
+    public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+        if (cloudFiles.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        // First get the set of local files
+        Set<FileReference> localFiles = super.list(dir, filter);
+        Iterator<FileReference> localFilesIter = localFiles.iterator();
+
+        // Reconcile local files and cloud files
+        while (localFilesIter.hasNext()) {
+            FileReference file = localFilesIter.next();
+            if (file.getFile().isDirectory()) {
+                continue;
+            }
+
+            String path = file.getRelativePath();
+            if (!cloudFiles.contains(path)) {
+                // Delete local files that do not exist in cloud storage (the ground truth for valid files)
+                localFilesIter.remove();
+                super.delete(file);
+            } else {
+                // No need to re-add it in the following loop
+                cloudFiles.remove(path);
+            }
+        }
+
+        // Add the remaining files that are not stored locally (if any)
+        for (String cloudFile : cloudFiles) {
+            localFiles.add(dir.getChild(IoUtil.getFileNameFromPath(cloudFile)));
+        }
+        return new HashSet<>(localFiles);
+    }
+
+    @Override
+    public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+        HyracksDataException savedEx = null;
+        if (metadata) {
+            // only finish writing if metadata == true to prevent write limiter from finishing the stream and
+            // completing the upload.
+            CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream();
+            try {
+                stream.finish();
+            } catch (HyracksDataException e) {
+                savedEx = e;
+            }
+
+            if (savedEx != null) {
+                try {
+                    stream.abort();
+                } catch (HyracksDataException e) {
+                    savedEx.addSuppressed(e);
+                }
+                throw savedEx;
+            }
+        }
+        // Sync only after finalizing the upload to cloud storage
+        super.sync(fileHandle, metadata);
+    }
+
+    @Override
+    public long getSize(IFileHandle fileHandle) {
+        if (fileHandle.getFileReference().getFile().exists()) {
+            // This should always provide the correct size despite what is buffered in local disk
+            return super.getSize(fileHandle);
+        }
+        return cloudClient.getObjectSize(bucket, fileHandle.getFileReference().getRelativePath());
+    }
+
+    @Override
+    public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
+        super.overwrite(fileRef, bytes);
+        // Write here will overwrite the older object if exists
+        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+    }
+
+    // TODO utilize locally stored files for reading
+    @Override
+    public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        if (fHandle.getFileReference().getFile().exists()) {
+            return super.doSyncRead(fHandle, offset, data);
+        }
+        return cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+    }
+
+    // TODO: We need to download this too
+    @Override
+    public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
+        if (fileRef.getFile().exists()) {
+            return super.readAllBytes(fileRef);
+        }
+        return cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
+    }
+
+    @Override
+    public void deleteDirectory(FileReference fileRef) throws HyracksDataException {
+        super.deleteDirectory(fileRef);
+        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+            // Never delete the storage dir in cloud storage
+            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+        }
+    }
+
+    @Override
+    public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) {
+        Set<String> paths = cloudClient.listObjects(bucket, root.getRelativePath(), filter);
+        List<FileReference> fileReferences = new ArrayList<>();
+        for (String path : paths) {
+            fileReferences.add(new FileReference(root.getDeviceHandle(), path));
+        }
+        return fileReferences;
+    }
+
+    @Override
+    public boolean exists(FileReference fileRef) {
+        // Check if the file exists locally first as newly created files (i.e., they are empty) are not stored in cloud storage
+        return fileRef.getFile().exists() || cloudClient.exists(bucket, fileRef.getRelativePath());
+    }
+
+    @Override
+    public void create(FileReference fileRef) throws HyracksDataException {
+        // We need to delete the local file on create as the cloud storage didn't complete the upload
+        // In other words, both cloud files and the local files are not in sync
+        super.delete(fileRef);
+        super.create(fileRef);
+    }
+
+    @Override
+    public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
+        cloudClient.copy(bucket, srcFileRef.getRelativePath(), destFileRef);
+        super.copyDirectory(srcFileRef, destFileRef);
+    }
+
+    protected long writeLocally(IFileHandle fHandle, long offset, ByteBuffer buffer) throws HyracksDataException {
+        return super.doSyncWrite(fHandle, offset, buffer);
+    }
+
+    protected void syncLocally(IFileHandle fileHandle) throws HyracksDataException {
+        super.sync(fileHandle, true);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudResettableInputStream.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudResettableInputStream.java
new file mode 100644
index 0000000000..84af84f663
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudResettableInputStream.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+
+public class CloudResettableInputStream extends InputStream {
+    // TODO: make configurable
+    public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
+    private final WriteBufferProvider bufferProvider;
+    private ByteBuffer writeBuffer;
+
+    private final ICloudBufferedWriter bufferedWriter;
+
+    public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, WriteBufferProvider bufferProvider) {
+        this.bufferedWriter = bufferedWriter;
+        this.bufferProvider = bufferProvider;
+    }
+
+    private void open() {
+        if (writeBuffer == null) {
+            writeBuffer = bufferProvider.getBuffer();
+            writeBuffer.clear();
+        }
+    }
+
+    @Override
+    public void reset() {
+        writeBuffer.reset();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    @Override
+    public synchronized void mark(int readLimit) {
+        writeBuffer.mark();
+    }
+
+    public void write(ByteBuffer header, ByteBuffer page) throws HyracksDataException {
+        open();
+        write(header);
+        write(page);
+    }
+
+    public int write(ByteBuffer page) throws HyracksDataException {
+        open();
+
+        // amount to write
+        int size = page.limit();
+
+        // full buffer = upload -> write all
+        if (writeBuffer.remaining() == 0) {
+            uploadAndWait();
+        }
+
+        // write partial -> upload -> write -> upload -> ...
+        int offset = 0;
+        int pageRemaining = size;
+        while (pageRemaining > 0) {
+            // enough to write all
+            if (writeBuffer.remaining() > pageRemaining) {
+                writeBuffer.put(page.array(), offset, pageRemaining);
+                return size;
+            }
+
+            int remaining = writeBuffer.remaining();
+            writeBuffer.put(page.array(), offset, remaining);
+            pageRemaining -= remaining;
+            offset += remaining;
+            uploadAndWait();
+        }
+
+        return size;
+    }
+
+    public void finish() throws HyracksDataException {
+        open();
+        try {
+            if (writeBuffer.position() > 0) {
+                uploadAndWait();
+            }
+            bufferedWriter.finish();
+        } finally {
+            returnBuffer();
+        }
+    }
+
+    public void abort() throws HyracksDataException {
+        try {
+            bufferedWriter.abort();
+        } finally {
+            returnBuffer();
+        }
+    }
+
+    private void uploadAndWait() throws HyracksDataException {
+        writeBuffer.flip();
+        try {
+            bufferedWriter.upload(this, writeBuffer.limit());
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+
+        writeBuffer.clear();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (writeBuffer.remaining() == 0) {
+            return -1;
+        }
+
+        int length = Math.min(len, writeBuffer.remaining());
+        writeBuffer.get(b, off, length);
+        return length;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return writeBuffer.get();
+    }
+
+    private void returnBuffer() {
+        if (writeBuffer != null) {
+            bufferProvider.recycle(writeBuffer);
+            writeBuffer = null;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/LocalCacheUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/LocalCacheUtil.java
new file mode 100644
index 0000000000..ada32b66ae
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/LocalCacheUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+import org.apache.hyracks.util.file.FileUtil;
+
+// TODO replace with a proper caching mechanism
+public class LocalCacheUtil {
+    private LocalCacheUtil() {
+
+    }
+
+    public static void writeToFile(FileReference fileRef, byte[] bytes) throws HyracksDataException {
+        try {
+            File file = fileRef.getFile();
+            FileUtils.createParentDirectories(file);
+            FileUtil.writeAndForce(file.toPath(), bytes);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    // TODO: replace with proper caching policy
+    public static void download(ICloudClient cloudClient, CloudIOManager ioManager, CloudFileHandle fileHandle,
+            IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode, ByteBuffer writeBuffer)
+            throws HyracksDataException {
+        FileReference fileRef = fileHandle.getFileReference();
+        // write the file locally (the call to open is synchronized hence only one thread can perform this call)
+        try (InputStream inStream = cloudClient.getObjectStream(ioManager.getBucket(), fileRef.getRelativePath())) {
+            File file = fileRef.getFile();
+            FileUtils.createParentDirectories(fileRef.getFile());
+            if (!file.createNewFile()) {
+                throw new IllegalStateException("Couldn't create local file");
+            }
+            fileHandle.open(rwMode, syncMode);
+            LocalCacheUtil.writeToFile(ioManager, fileHandle, inStream, writeBuffer);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private static void writeToFile(CloudIOManager ioManager, IFileHandle fileHandle, InputStream inStream,
+            ByteBuffer writeBuffer) throws HyracksDataException {
+        writeBuffer.clear();
+        try {
+            int position = 0;
+            long offset = 0;
+            int read;
+            while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
+                position += read;
+                writeBuffer.position(position);
+                if (writeBuffer.remaining() == 0) {
+                    offset += writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+                    position = 0;
+                }
+            }
+
+            if (writeBuffer.position() > 0) {
+                writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+                ioManager.syncLocally(fileHandle);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+    }
+
+    private static long writeBufferToFile(CloudIOManager ioManager, IFileHandle fileHandle, ByteBuffer writeBuffer,
+            long offset) throws HyracksDataException {
+        writeBuffer.flip();
+        long written = ioManager.writeLocally(fileHandle, offset, writeBuffer);
+        writeBuffer.clear();
+        return written;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/WriteBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/WriteBufferProvider.java
new file mode 100644
index 0000000000..44adf45dd7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/WriteBufferProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import static org.apache.hyracks.control.nc.io.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class WriteBufferProvider {
+    private final BlockingQueue<ByteBuffer> writeBuffers;
+
+    public WriteBufferProvider(int ioParallelism) {
+        writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
+    }
+
+    public void recycle(ByteBuffer buffer) {
+        writeBuffers.offer(buffer);
+    }
+
+    public ByteBuffer getBuffer() {
+        ByteBuffer writeBuffer = writeBuffers.poll();
+        if (writeBuffer == null) {
+            return ByteBuffer.allocate(MIN_BUFFER_SIZE);
+        }
+        return writeBuffer;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/CloudClientProvider.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/CloudClientProvider.java
new file mode 100644
index 0000000000..72aa566aa4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/CloudClientProvider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.util.HashMap;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.control.nc.io.cloud.clients.azure.blob.AzureBlobCloudClient;
+import org.apache.hyracks.control.nc.io.cloud.clients.gcp.gcs.GCSCloudClient;
+
+public class CloudClientProvider {
+
+    public enum ClientType {
+        NO_OP,
+        S3,
+        AZURE_BLOB,
+        GOOGLE_CLOUD_STORAGE
+    }
+
+    private CloudClientProvider() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static ICloudClient getClient(ClientType clientType) throws HyracksDataException {
+        switch (clientType) {
+            case NO_OP:
+                return NoOpCloudClient.INSTANCE;
+            case S3:
+                // TODO: map should have the config already
+                return new S3CloudClient(new HashMap<>());
+            case AZURE_BLOB:
+                return new AzureBlobCloudClient();
+            case GOOGLE_CLOUD_STORAGE:
+                return new GCSCloudClient();
+            default:
+                throw HyracksDataException.create(new IllegalArgumentException("Unknown cloud client type"));
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudBufferedWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudBufferedWriter.java
new file mode 100644
index 0000000000..8bb2d68578
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudBufferedWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.io.InputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ICloudBufferedWriter {
+
+    /**
+     * Uploads input stream content
+     *
+     * @param stream stream
+     * @param length length
+     * @return amount uploaded
+     */
+    int upload(InputStream stream, int length);
+
+    /**
+     * Finishes the upload
+     *
+     * @throws HyracksDataException HyracksDataException
+     */
+    void finish() throws HyracksDataException;
+
+    /**
+     * Aborts the upload
+     *
+     * @throws HyracksDataException HyracksDataException
+     */
+    void abort() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudClient.java
new file mode 100644
index 0000000000..bb12769abc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+/**
+ * Interface containing methods to perform IO operation on the Cloud Storage
+ */
+public interface ICloudClient {
+
+    /**
+     * Creates a cloud buffered writer
+     *
+     * @param bucket bucket to write to
+     * @param path path to write to
+     * @return buffered writer
+     */
+    ICloudBufferedWriter createBufferedWriter(String bucket, String path);
+
+    /**
+     * Lists objects at the specified bucket and path, and applies the file name filter on the returned objects
+     *
+     * @param bucket bucket to list from
+     * @param path path to list from
+     * @param filter filter to apply
+     * @return file names returned after applying the file name filter
+     */
+    Set<String> listObjects(String bucket, String path, FilenameFilter filter);
+
+    /**
+     * Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the
+     * buffer.remaining()
+     *
+     * @param bucket bucket
+     * @param path path
+     * @param offset offset
+     * @param buffer buffer
+     * TODO(htowaileb) should this be returning the buffer position or the total amount read?
+     * @return returns the buffer position
+     */
+    int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException;
+
+    /**
+     * Reads all bytes of an object at the specified bucket and path
+     *
+     * @param bucket bucket
+     * @param path path
+     * @return bytes
+     * @throws HyracksDataException HyracksDataException
+     */
+    byte[] readAllBytes(String bucket, String path) throws HyracksDataException;
+
+    /**
+     * Returns the {@code InputStream} of an object at the specified bucket and path
+     *
+     * @param bucket bucket
+     * @param path path
+     * @return inputstream
+     */
+    InputStream getObjectStream(String bucket, String path);
+
+    /**
+     * Writes the content of the byte array into the bucket at the specified path
+     *
+     * @param bucket bucket
+     * @param path path
+     * @param data data
+     */
+    void write(String bucket, String path, byte[] data);
+
+    /**
+     * Copies an object from the source path to the destination path
+     *
+     * @param bucket bucket
+     * @param srcPath source path
+     * @param destPath destination path
+     */
+    void copy(String bucket, String srcPath, FileReference destPath);
+
+    /**
+     * Deletes an object at the specified bucket and path
+     *
+     * @param bucket bucket
+     * @param path path
+     */
+    void deleteObject(String bucket, String path);
+
+    /**
+     * Returns the size of the object at the specified path
+     *
+     * @param bucket bucket
+     * @param path path
+     * @return size
+     */
+    long getObjectSize(String bucket, String path);
+
+    /**
+     * Checks if an object exists at the specified path
+     *
+     * @param bucket bucket
+     * @param path path
+     * @return {@code true} if the object exists, {@code false} otherwise
+     */
+    boolean exists(String bucket, String path);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/NoOpCloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/NoOpCloudClient.java
new file mode 100644
index 0000000000..2751181cb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/NoOpCloudClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+public class NoOpCloudClient implements ICloudClient {
+
+    public static final NoOpCloudClient INSTANCE = new NoOpCloudClient();
+
+    private NoOpCloudClient() {
+        // do not instantiate
+    }
+
+    @Override
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+        return null;
+    }
+
+    @Override
+    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+        return null;
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        return 0;
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        return new byte[0];
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path) {
+        return null;
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+
+    }
+
+    @Override
+    public void deleteObject(String bucket, String path) {
+
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) {
+        return 0;
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) {
+        return false;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3BufferedWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3BufferedWriter.java
new file mode 100644
index 0000000000..a793f40ddd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.aws.s3;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+
+public class S3BufferedWriter implements ICloudBufferedWriter {
+    private final List<CompletedPart> partQueue;
+    private final String path;
+    private final S3Client s3Client;
+    private final String bucket;
+    private String uploadId;
+    private int partNumber;
+    private static final int MAX_RETRIES = 3;
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public S3BufferedWriter(S3Client s3client, String bucket, String path) {
+        this.s3Client = s3client;
+        this.bucket = bucket;
+        this.path = path;
+        partQueue = new ArrayList<>();
+    }
+
+    @Override
+    public int upload(InputStream stream, int length) {
+        setUploadId();
+        UploadPartRequest upReq =
+                UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
+        String etag = s3Client.uploadPart(upReq, RequestBody.fromInputStream(stream, length)).eTag();
+        partQueue.add(CompletedPart.builder().partNumber(partNumber).eTag(etag).build());
+
+        return partNumber++;
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(partQueue).build();
+        CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
+                .bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
+        int retries = 0;
+        while (true) {
+            try {
+                completeMultipartUpload(completeMultipartUploadRequest);
+                break;
+            } catch (Exception e) {
+                retries++;
+                if (retries == MAX_RETRIES) {
+                    throw HyracksDataException.create(e);
+                }
+                LOGGER.info(() -> "S3 storage write retry, encountered: " + e.getMessage());
+
+                // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+                try {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 : 2));
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        s3Client.abortMultipartUpload(
+                AbortMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).build());
+    }
+
+    private void completeMultipartUpload(CompleteMultipartUploadRequest request) throws HyracksDataException {
+        try {
+            s3Client.completeMultipartUpload(request);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void setUploadId() {
+        if (uploadId == null) {
+            CreateMultipartUploadRequest uploadRequest =
+                    CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
+            CreateMultipartUploadResponse uploadResp = s3Client.createMultipartUpload(uploadRequest);
+            uploadId = uploadResp.uploadId();
+            partNumber = 1;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3CloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3CloudClient.java
new file mode 100644
index 0000000000..af1441e8e9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3CloudClient.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.aws.s3;
+
+import static org.apache.hyracks.control.nc.io.cloud.clients.aws.s3.S3Utils.listS3Objects;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3CloudClient implements ICloudClient {
+
+    private static final String ACCESS_KEY_ID_FIELD = "accessKeyId";
+    private static final String SECRET_ACCESS_KEY_FIELD = "secretAccessKey";
+    private static final String REGION_FIELD = "region";
+    private final static String ENDPOINT_FIELD = "endpoint";
+
+    private final S3Client s3Client;
+
+    // TODO fix the throws exception
+    public S3CloudClient(Map<String, String> clientConfiguration) throws HyracksDataException {
+        setClientConfig(clientConfiguration); // TODO: remove later, this is temporary
+        s3Client = buildClient(clientConfiguration);
+    }
+
+    private S3Client buildClient(Map<String, String> clientConfiguration) throws HyracksDataException {
+        String accessKeyId = clientConfiguration.get(ACCESS_KEY_ID_FIELD);
+        String secretAccessKey = clientConfiguration.get(SECRET_ACCESS_KEY_FIELD);
+        String region = clientConfiguration.get(REGION_FIELD);
+        String endpoint = clientConfiguration.get(ENDPOINT_FIELD);
+
+        AwsCredentialsProvider credentialsProvider =
+                StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+        S3ClientBuilder builder = S3Client.builder();
+        builder.credentialsProvider(credentialsProvider);
+        builder.region(Region.of(region));
+
+        if (endpoint != null) {
+            try {
+                URI uri = new URI(endpoint);
+                builder.endpointOverride(uri);
+            } catch (Exception ex) {
+                throw HyracksDataException.create(ex);
+            }
+        }
+        return builder.build();
+    }
+
+    // TODO: temporarily setting the client config, this should be provided
+    private void setClientConfig(Map<String, String> clientConfiguration) throws HyracksDataException {
+        if (!clientConfiguration.isEmpty()) {
+            return;
+        }
+
+        try {
+            List<String> lines = FileUtils.readLines(new File("/etc/s3"), "UTF-8");
+            String accessKeyId = lines.get(1);
+            String secretAccessKey = lines.get(2);
+            String region = lines.get(3);
+
+            clientConfiguration.put(ACCESS_KEY_ID_FIELD, accessKeyId);
+            clientConfiguration.put(SECRET_ACCESS_KEY_FIELD, secretAccessKey);
+            clientConfiguration.put(REGION_FIELD, region);
+
+            if (lines.size() > 4) {
+                String serviceEndpoint = lines.get(4);
+                clientConfiguration.put(ENDPOINT_FIELD, serviceEndpoint);
+            }
+        } catch (IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    @Override
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+        return new S3BufferedWriter(s3Client, bucket, path);
+    }
+
+    @Override
+    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+        return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        long readTo = offset + buffer.remaining();
+        GetObjectRequest rangeGetObjectRequest =
+                GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
+
+        int totalRead = 0;
+        int read = 0;
+
+        // TODO(htowaileb): add retry logic here
+        try (ResponseInputStream<GetObjectResponse> response = s3Client.getObject(rangeGetObjectRequest)) {
+            while (buffer.remaining() > 0) {
+                read = response.read(buffer.array(), buffer.position(), buffer.remaining());
+                buffer.position(buffer.position() + read);
+                totalRead += read;
+            }
+        } catch (IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+
+        if (buffer.remaining() != 0) {
+            throw new IllegalStateException("Expected buffer remaining = 0, found: " + buffer.remaining());
+        }
+        return totalRead;
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+        try {
+            ResponseInputStream<GetObjectResponse> stream = s3Client.getObject(getReq);
+            return stream.readAllBytes();
+        } catch (NoSuchKeyException e) {
+            return null;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path) {
+        GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+        try {
+            return s3Client.getObject(getReq);
+        } catch (NoSuchKeyException e) {
+            // This should not happen at least from the only caller of this method
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+        PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
+
+        // TODO(htowaileb): add retry logic here
+        s3Client.putObject(putReq, RequestBody.fromBytes(data));
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+        List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+        for (S3Object object : objects) {
+            String srcKey = object.key();
+            String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
+            CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
+                    .destinationBucket(bucket).destinationKey(destKey).build();
+            s3Client.copyObject(copyReq);
+        }
+    }
+
+    @Override
+    public void deleteObject(String bucket, String path) {
+        Set<String> fileList = listObjects(bucket, path, IoUtil.NO_OP_FILTER);
+        if (fileList.isEmpty()) {
+            return;
+        }
+
+        List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
+        for (String file : fileList) {
+            objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
+        }
+        Delete delete = Delete.builder().objects(objectIdentifiers).build();
+        DeleteObjectsRequest deleteReq = DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
+        s3Client.deleteObjects(deleteReq);
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) {
+        List<S3Object> objects = listS3Objects(s3Client, bucket, path);
+        if (objects.isEmpty()) {
+            return 0;
+        }
+        return objects.get(0).size();
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) {
+        List<S3Object> objects = listS3Objects(s3Client, bucket, path);
+        return !objects.isEmpty();
+    }
+
+    private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
+        Set<String> files = new HashSet<>();
+        for (S3Object s3Object : contents) {
+            String path = s3Object.key();
+            if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
+                files.add(path);
+            }
+        }
+        return files;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3Utils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3Utils.java
new file mode 100644
index 0000000000..e29b61156a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3Utils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.aws.s3;
+
+import java.util.List;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3Utils {
+
+    private S3Utils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static List<S3Object> listS3Objects(S3Client s3Client, String bucket, String path) {
+        String newMarker = null;
+
+        ListObjectsV2Response listObjectsResponse;
+        ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(bucket);
+        listObjectsBuilder.prefix(path);
+
+        while (true) {
+            // List the objects from the start, or from the last marker in case of truncated result
+            if (newMarker == null) {
+                listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+            } else {
+                listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+            }
+
+            // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+            if (Boolean.FALSE.equals(listObjectsResponse.isTruncated())) {
+                break;
+            } else {
+                newMarker = listObjectsResponse.nextContinuationToken();
+            }
+        }
+        return listObjectsResponse.contents();
+    }
+
+    // TODO(htowaileb): Test few runs with default client and see if any failures are encountered
+    //    private static SdkHttpClient buildHttpClient() {
+    //        ApacheHttpClient.Builder apacheClientBuilder = ApacheHttpClient.builder();
+    //
+    //        AttributeMap.Builder overriddenConfigBuilder = AttributeMap.builder();
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, 128);
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, Duration.ofMinutes(60));
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, Duration.ofMinutes(60));
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT, Duration.ofMinutes(60));
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT, Duration.ofMinutes(60));
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.WRITE_TIMEOUT, Duration.ofMinutes(60));
+    //        overriddenConfigBuilder.put(SdkHttpConfigurationOption.TCP_KEEPALIVE, Boolean.TRUE);
+    //        AttributeMap configuration = overriddenConfigBuilder.build();
+    //
+    //        return apacheClientBuilder.buildWithDefaults(configuration);
+    //    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/azure/blob/AzureBlobCloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/azure/blob/AzureBlobCloudClient.java
new file mode 100644
index 0000000000..6dfe05ae5c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/azure/blob/AzureBlobCloudClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.azure.blob;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+public class AzureBlobCloudClient implements ICloudClient {
+
+    public AzureBlobCloudClient() {
+        throw new IllegalStateException("NYI");
+    }
+
+    @Override
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+        return null;
+    }
+
+    @Override
+    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+        return null;
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        return 0;
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        return new byte[0];
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path) {
+        return null;
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+
+    }
+
+    @Override
+    public void deleteObject(String bucket, String path) {
+
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) {
+        return 0;
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) {
+        return false;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/gcp/gcs/GCSCloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/gcp/gcs/GCSCloudClient.java
new file mode 100644
index 0000000000..a9a6f06f4c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/gcp/gcs/GCSCloudClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.gcp.gcs;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+public class GCSCloudClient implements ICloudClient {
+
+    public GCSCloudClient() {
+        throw new IllegalStateException("NYI");
+    }
+
+    @Override
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+        return null;
+    }
+
+    @Override
+    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+        return null;
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        return 0;
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        return new byte[0];
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path) {
+        return null;
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+
+    }
+
+    @Override
+    public void deleteObject(String bucket, String path) {
+
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) {
+        return 0;
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) {
+        return false;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/LSMTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/LSMTest.java
new file mode 100644
index 0000000000..d62242f827
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/LSMTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.lsm;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.control.nc.io.cloud.CloudResettableInputStream;
+import org.apache.hyracks.control.nc.io.cloud.WriteBufferProvider;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public abstract class LSMTest {
+    public static final Logger LOGGER = LogManager.getLogger();
+
+    public static final String BTREE_SUFFIX = "b";
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    private final static String BUCKET_STORAGE_ROOT = "storage";
+    private static final int BUFFER_SIZE = 136 * 1024 + 5;
+
+    public static ICloudClient CLOUD_CLIENT;
+
+    @Test
+    public void a4deleteTest() {
+        try {
+            CLOUD_CLIENT.deleteObject(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    @Test
+    public void a1writeToS3Test() throws IOException {
+        CloudResettableInputStream stream = null;
+
+        try {
+            ICloudBufferedWriter s3BufferedWriter =
+                    CLOUD_CLIENT.createBufferedWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b");
+            stream = new CloudResettableInputStream(s3BufferedWriter, new WriteBufferProvider(1));
+            ByteBuffer content = createContent(BUFFER_SIZE);
+            int size = 0;
+            for (int i = 0; i < 10; i++) {
+                content.clear();
+                size += stream.write(content);
+            }
+            stream.finish();
+            System.err.println(size);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (stream != null) {
+                stream.abort();
+            }
+        } finally {
+            if (stream != null) {
+                stream.close();
+            }
+        }
+    }
+
+    @Test
+    public void a3readFromS3Test() {
+        try {
+            ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+            buffer.clear();
+
+            long offset = BUFFER_SIZE * 4;
+            int read = CLOUD_CLIENT.read(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", offset, buffer);
+            buffer.clear();
+
+            for (int i = 0; i < read; i++) {
+                assert i % 127 == buffer.get();
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    @Test
+    public void a2listTest() {
+        try {
+            FilenameFilter btreeFilter = (dir, name) -> !name.startsWith(".") && name.endsWith(BTREE_SUFFIX);
+            System.err.println((CLOUD_CLIENT.listObjects(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT, btreeFilter)));
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    private ByteBuffer createContent(int size) {
+        byte[] contentArray = new byte[size];
+        for (int i = 0; i < size; i++) {
+            contentArray[i] = (byte) (i % 127);
+        }
+        return ByteBuffer.wrap(contentArray);
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/aws/s3/LSMS3Test.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/aws/s3/LSMS3Test.java
new file mode 100644
index 0000000000..e9d0689377
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/aws/s3/LSMS3Test.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.lsm.aws.s3;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.control.nc.io.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.control.nc.lsm.LSMTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import io.findify.s3mock.S3Mock;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+
+public class LSMS3Test extends LSMTest {
+
+    private static S3Client client;
+    private static S3Mock s3MockServer;
+    private static final int MOCK_SERVER_PORT = 8001;
+    private static final String MOCK_SERVER_HOSTNAME = "http://localhost:" + MOCK_SERVER_PORT;
+    private static final String MOCK_SERVER_REGION = "us-west-2"; // does not matter the value
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        LOGGER.info("LSMS3Test setup");
+        LOGGER.info("Starting S3 mock server");
+        s3MockServer = new S3Mock.Builder().withPort(MOCK_SERVER_PORT).withInMemoryBackend().build();
+        s3MockServer.start();
+        LOGGER.info("S3 mock server started successfully");
+
+        // Create a client and add some files to the S3 mock server
+        LOGGER.info("Creating S3 client to load initial files to S3 mock server");
+        S3ClientBuilder builder = S3Client.builder();
+        URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server
+        builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+                .endpointOverride(endpoint);
+        client = builder.build();
+        cleanup();
+        client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+        LOGGER.info("Client created successfully");
+
+        Map<String, String> clientConfiguration = new HashMap<>();
+        clientConfiguration.put("accessKeyId", "randomValue");
+        clientConfiguration.put("secretAccessKey", "randomValue");
+        clientConfiguration.put("region", "randomValue");
+        clientConfiguration.put("endpoint", MOCK_SERVER_HOSTNAME);
+        CLOUD_CLIENT = new S3CloudClient(clientConfiguration);
+    }
+
+    private static void cleanup() {
+        try {
+            client.deleteBucket(DeleteBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        // Shutting down S3 mock server
+        LOGGER.info("Shutting down S3 mock server and client");
+        if (client != null) {
+            client.close();
+        }
+        if (s3MockServer != null) {
+            s3MockServer.shutdown();
+        }
+        LOGGER.info("S3 mock down and client shut down successfully");
+    }
+}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index 348b5bf4f5..c1b8bfcf6f 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -77,6 +77,7 @@
     <jackson.version>2.14.1</jackson.version>
     <jackson-databind.version>${jackson.version}</jackson-databind.version>
     <netty.version>4.1.87.Final</netty.version>
+    <awsjavasdk.version>2.17.218</awsjavasdk.version>
 
     <implementation.title>Apache Hyracks and Algebricks - ${project.name}</implementation.title>
     <implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -480,6 +481,38 @@
         <artifactId>jetty-util-ajax</artifactId>
         <version>9.4.48.v20220622</version>
       </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>sdk-core</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>s3</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>regions</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>auth</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.findify</groupId>
+        <artifactId>s3mock_2.12</artifactId>
+        <version>0.2.5</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>com.typesafe.akka</groupId>
+        <artifactId>akka-http-core_2.12</artifactId>
+        <version>10.1.0</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <build>