You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/04/19 02:40:56 UTC

[airavata-mft] branch master updated: Streaming chunked data transfer support

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new e917df5  Streaming chunked data transfer support
e917df5 is described below

commit e917df5eefa87618d0131605824ed8f0a614d326
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Apr 18 22:40:23 2022 -0400

    Streaming chunked data transfer support
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    |  8 ++++-
 .../airavata/mft/agent/TransportMediator.java      | 36 ++++++++++++++++------
 .../mft/core/api/IncomingChunkedConnector.java     |  1 +
 .../mft/core/api/OutgoingChunkedConnector.java     |  1 +
 .../mft/transport/s3/S3IncomingConnector.java      | 10 ++++++
 .../mft/transport/s3/S3OutgoingConnector.java      | 15 +++++++++
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5a61d31..18e7ba6 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -107,6 +107,9 @@ public class MFTAgent implements CommandLineRunner {
     @org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
     private int chunkedSize;
 
+    @org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
+    private boolean doChunkStream;
+
     private final Semaphore mainHold = new Semaphore(0);
 
     private KVCache transferMessageCache;
@@ -139,7 +142,10 @@ public class MFTAgent implements CommandLineRunner {
     public void init() {
         transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
         rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
-        mediator = new TransportMediator(tempDataDir, concurrentTransfers, concurrentChunkedThreads, chunkedSize);
+        mediator = new TransportMediator(tempDataDir,
+                concurrentTransfers,
+                concurrentChunkedThreads,
+                chunkedSize, doChunkStream);
         transferRequestExecutor = Executors.newFixedThreadPool(concurrentTransfers);
     }
 
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 49de2f8..f084853 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -45,14 +45,20 @@ public class TransportMediator {
 
     private String tempDataDir = "/tmp";
     private final int chunkedSize;
+    private final boolean doChunkStreaming;
 
     private final ExecutorService chunkedExecutorService;
 
-    public TransportMediator(String tempDataDir, int concurrentTransfers, int concurrentChunkedThreads, int chunkedSize) {
+    public TransportMediator(String tempDataDir,
+                             int concurrentTransfers,
+                             int concurrentChunkedThreads,
+                             int chunkedSize,
+                             boolean doChunkStreaming) {
         this.tempDataDir = tempDataDir;
         monitorPool = Executors.newFixedThreadPool(concurrentTransfers);
         this.chunkedSize = chunkedSize;
         chunkedExecutorService = Executors.newFixedThreadPool(concurrentChunkedThreads);
+        this.doChunkStreaming = doChunkStreaming;
     }
 
     public void transferSingleThread(String transferId,
@@ -115,8 +121,10 @@ public class TransportMediator {
                         endPos = fileLength;
                     }
 
-                    String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
-                    completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+                    completionService.submit(new ChunkMover(inConnector,
+                            outConnector, uploadLength, endPos, chunkIdx,
+                            transferId, doChunkStreaming));
 
                     uploadLength = endPos;
                     chunkIdx++;
@@ -221,30 +229,38 @@ public class TransportMediator {
         monitorPool.shutdown();
     }
 
-    private static class ChunkMover implements Callable<Integer> {
+    private class ChunkMover implements Callable<Integer> {
 
         IncomingChunkedConnector downloader;
         OutgoingChunkedConnector uploader;
         long startPos;
         long endPos;
         int chunkIdx;
-        String tempFile;
+        String transferId;
+        boolean useStreaming;
 
         public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long startPos,
-                          long endPos, int chunkIdx, String tempFile) {
+                          long endPos, int chunkIdx, String transferId, boolean useStreaming) {
             this.downloader = downloader;
             this.uploader = uploader;
             this.startPos = startPos;
             this.endPos = endPos;
             this.chunkIdx = chunkIdx;
-            this.tempFile = tempFile;
+            this.transferId = transferId;
+            this.useStreaming = useStreaming;
         }
 
         @Override
         public Integer call() throws Exception {
-            downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
-            uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
-            new File(tempFile).delete();
+            if (useStreaming) {
+                InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
+                uploader.uploadChunk(chunkIdx, startPos, endPos,inputStream);
+            } else {
+                String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+                downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
+                uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
+                new File(tempFile).delete();
+            }
             return chunkIdx;
         }
     }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
index 24908a2..0848004 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -4,4 +4,5 @@ import java.io.InputStream;
 
 public interface IncomingChunkedConnector extends BasicConnector {
     public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
+    public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception;
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
index d9cc5b5..77ea2ab 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -4,4 +4,5 @@ import java.io.InputStream;
 
 public interface OutgoingChunkedConnector extends BasicConnector {
     public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
+    public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception;
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index 29e8263..19ecfaf 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -93,6 +93,16 @@ public class S3IncomingConnector implements IncomingChunkedConnector, IncomingSt
         logger.debug("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
     }
 
+    @Override
+    public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception {
+        GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath());
+        rangeObjectRequest.setRange(startByte, endByte - 1);
+        logger.debug("Fetching input stream for chunk {} in resource {}", chunkId, resource.getResourceId());
+        S3Object object = s3Client.getObject(rangeObjectRequest);
+        return object.getObjectContent();
+    }
+
     @Override
     public void complete() throws Exception {
 
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index aa59378..e608e9e 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -96,6 +96,21 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
         logger.debug("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
     }
 
+    @Override
+    public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception {
+        UploadPartRequest uploadRequest = new UploadPartRequest()
+                .withBucketName(resource.getS3Storage().getBucketName())
+                .withKey(resource.getFile().getResourcePath())
+                .withUploadId(initResponse.getUploadId())
+                .withPartNumber(chunkId + 1)
+                .withFileOffset(0)
+                .withInputStream(inputStream)
+                .withPartSize(endByte - startByte);
+
+        UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+        this.partETags.add(uploadResult.getPartETag());
+        logger.debug("Uploaded S3 chunk {} for resource id {} using stream", chunkId, resource.getResourceId());
+    }
 
     @Override
     public void complete() throws Exception {