You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/04/20 22:21:32 UTC
[airavata-mft] branch master updated: Minor bug fixes
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new d0c6037 Minor bug fixes
d0c6037 is described below
commit d0c60371636f111cac67e679f7e2ebc29a634048
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Apr 20 18:21:24 2022 -0400
Minor bug fixes
---
.../airavata/mft/agent/TransportMediator.java | 46 +++++++++++++---------
.../mft/transport/s3/S3OutgoingConnector.java | 1 +
2 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index f084853..0323437 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -73,13 +73,6 @@ public class TransportMediator {
try {
logger.info("Stating transfer {}", transferId);
- long start = System.currentTimeMillis();
-
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(0)
- .setState("RUNNING")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer is ongoing"));
Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
.resolveIncomingStreamingConnector(request.getSourceType());
@@ -91,6 +84,16 @@ public class TransportMediator {
Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
.resolveOutgoingChunkedConnector(request.getDestinationType());
+
+
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(0)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer is ongoing"));
+
+ long start = System.currentTimeMillis();
+
// Give priority for chunked transfers.
// TODO: Provide a preference at the API level
if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
@@ -198,7 +201,9 @@ public class TransportMediator {
throw new Exception("No matching connector found to perform the transfer");
}
- long time = (System.currentTimeMillis() - start) / 1000;
+ long endTime = System.currentTimeMillis();
+
+ double time = (endTime - start) / 1000.0;
logger.info("Transfer {} completed. Time {} S. Speed {} MB/s", transferId, time,
(srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
@@ -206,7 +211,7 @@ public class TransportMediator {
onStatusCallback.accept(transferId, new TransferState()
.setPercentage(100)
.setState("COMPLETED")
- .setUpdateTimeMils(System.currentTimeMillis())
+ .setUpdateTimeMils(endTime)
.setDescription("Transfer successfully completed"));
exitingCallback.accept(transferId, true);
@@ -252,16 +257,21 @@ public class TransportMediator {
@Override
public Integer call() throws Exception {
- if (useStreaming) {
- InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
- uploader.uploadChunk(chunkIdx, startPos, endPos,inputStream);
- } else {
- String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
- downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
- uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
- new File(tempFile).delete();
+ try {
+ if (useStreaming) {
+ InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
+ uploader.uploadChunk(chunkIdx, startPos, endPos, inputStream);
+ } else {
+ String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+ downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
+ uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
+ new File(tempFile).delete();
+ }
+ return chunkIdx;
+ } catch (Exception e) {
+ logger.error("Failed to transfer ", e);
+ throw e;
}
- return chunkIdx;
}
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index e608e9e..b598919 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -108,6 +108,7 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
.withPartSize(endByte - startByte);
UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+ inputStream.close();
this.partETags.add(uploadResult.getPartETag());
logger.debug("Uploaded S3 chunk {} for resource id {} using stream", chunkId, resource.getResourceId());
}