You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/04/19 02:40:56 UTC
[airavata-mft] branch master updated: Streaming chunked data transfer support
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new e917df5 Streaming chunked data transfer support
e917df5 is described below
commit e917df5eefa87618d0131605824ed8f0a614d326
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Apr 18 22:40:23 2022 -0400
Streaming chunked data transfer support
---
.../org/apache/airavata/mft/agent/MFTAgent.java | 8 ++++-
.../airavata/mft/agent/TransportMediator.java | 36 ++++++++++++++++------
.../mft/core/api/IncomingChunkedConnector.java | 1 +
.../mft/core/api/OutgoingChunkedConnector.java | 1 +
.../mft/transport/s3/S3IncomingConnector.java | 10 ++++++
.../mft/transport/s3/S3OutgoingConnector.java | 15 +++++++++
6 files changed, 60 insertions(+), 11 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5a61d31..18e7ba6 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -107,6 +107,9 @@ public class MFTAgent implements CommandLineRunner {
@org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
private int chunkedSize;
+ @org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
+ private boolean doChunkStream;
+
private final Semaphore mainHold = new Semaphore(0);
private KVCache transferMessageCache;
@@ -139,7 +142,10 @@ public class MFTAgent implements CommandLineRunner {
public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
- mediator = new TransportMediator(tempDataDir, concurrentTransfers, concurrentChunkedThreads, chunkedSize);
+ mediator = new TransportMediator(tempDataDir,
+ concurrentTransfers,
+ concurrentChunkedThreads,
+ chunkedSize, doChunkStream);
transferRequestExecutor = Executors.newFixedThreadPool(concurrentTransfers);
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 49de2f8..f084853 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -45,14 +45,20 @@ public class TransportMediator {
private String tempDataDir = "/tmp";
private final int chunkedSize;
+ private final boolean doChunkStreaming;
private final ExecutorService chunkedExecutorService;
- public TransportMediator(String tempDataDir, int concurrentTransfers, int concurrentChunkedThreads, int chunkedSize) {
+ public TransportMediator(String tempDataDir,
+ int concurrentTransfers,
+ int concurrentChunkedThreads,
+ int chunkedSize,
+ boolean doChunkStreaming) {
this.tempDataDir = tempDataDir;
monitorPool = Executors.newFixedThreadPool(concurrentTransfers);
this.chunkedSize = chunkedSize;
chunkedExecutorService = Executors.newFixedThreadPool(concurrentChunkedThreads);
+ this.doChunkStreaming = doChunkStreaming;
}
public void transferSingleThread(String transferId,
@@ -115,8 +121,10 @@ public class TransportMediator {
endPos = fileLength;
}
- String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
- completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+ completionService.submit(new ChunkMover(inConnector,
+ outConnector, uploadLength, endPos, chunkIdx,
+ transferId, doChunkStreaming));
uploadLength = endPos;
chunkIdx++;
@@ -221,30 +229,38 @@ public class TransportMediator {
monitorPool.shutdown();
}
- private static class ChunkMover implements Callable<Integer> {
+ private class ChunkMover implements Callable<Integer> {
IncomingChunkedConnector downloader;
OutgoingChunkedConnector uploader;
long startPos;
long endPos;
int chunkIdx;
- String tempFile;
+ String transferId;
+ boolean useStreaming;
public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long startPos,
- long endPos, int chunkIdx, String tempFile) {
+ long endPos, int chunkIdx, String transferId, boolean useStreaming) {
this.downloader = downloader;
this.uploader = uploader;
this.startPos = startPos;
this.endPos = endPos;
this.chunkIdx = chunkIdx;
- this.tempFile = tempFile;
+ this.transferId = transferId;
+ this.useStreaming = useStreaming;
}
@Override
public Integer call() throws Exception {
- downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
- uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
- new File(tempFile).delete();
+ if (useStreaming) {
+ InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
+ uploader.uploadChunk(chunkIdx, startPos, endPos,inputStream);
+ } else {
+ String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+ downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
+ uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
+ new File(tempFile).delete();
+ }
return chunkIdx;
}
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
index 24908a2..0848004 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -4,4 +4,5 @@ import java.io.InputStream;
public interface IncomingChunkedConnector extends BasicConnector {
public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
+ public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception;
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
index d9cc5b5..77ea2ab 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -4,4 +4,5 @@ import java.io.InputStream;
public interface OutgoingChunkedConnector extends BasicConnector {
public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
+ public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception;
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index 29e8263..19ecfaf 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -93,6 +93,16 @@ public class S3IncomingConnector implements IncomingChunkedConnector, IncomingSt
logger.debug("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
}
+ @Override
+ public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception {
+ GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath());
+ rangeObjectRequest.setRange(startByte, endByte - 1);
+ logger.debug("Fetching input stream for chunk {} in resource {}", chunkId, resource.getResourceId());
+ S3Object object = s3Client.getObject(rangeObjectRequest);
+ return object.getObjectContent();
+ }
+
@Override
public void complete() throws Exception {
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index aa59378..e608e9e 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -96,6 +96,21 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
logger.debug("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
}
+ @Override
+ public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception {
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(resource.getS3Storage().getBucketName())
+ .withKey(resource.getFile().getResourcePath())
+ .withUploadId(initResponse.getUploadId())
+ .withPartNumber(chunkId + 1)
+ .withFileOffset(0)
+ .withInputStream(inputStream)
+ .withPartSize(endByte - startByte);
+
+ UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+ this.partETags.add(uploadResult.getPartETag());
+ logger.debug("Uploaded S3 chunk {} for resource id {} using stream", chunkId, resource.getResourceId());
+ }
@Override
public void complete() throws Exception {