You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/04/20 22:21:32 UTC

[airavata-mft] branch master updated: Minor bug fixes

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new d0c6037  Minor bug fixes
d0c6037 is described below

commit d0c60371636f111cac67e679f7e2ebc29a634048
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Apr 20 18:21:24 2022 -0400

    Minor bug fixes
---
 .../airavata/mft/agent/TransportMediator.java      | 46 +++++++++++++---------
 .../mft/transport/s3/S3OutgoingConnector.java      |  1 +
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index f084853..0323437 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -73,13 +73,6 @@ public class TransportMediator {
         try {
 
             logger.info("Stating transfer {}", transferId);
-            long start = System.currentTimeMillis();
-
-            onStatusCallback.accept(transferId, new TransferState()
-                    .setPercentage(0)
-                    .setState("RUNNING")
-                    .setUpdateTimeMils(System.currentTimeMillis())
-                    .setDescription("Transfer is ongoing"));
 
             Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
                     .resolveIncomingStreamingConnector(request.getSourceType());
@@ -91,6 +84,16 @@ public class TransportMediator {
             Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
                     .resolveOutgoingChunkedConnector(request.getDestinationType());
 
+
+
+            onStatusCallback.accept(transferId, new TransferState()
+                    .setPercentage(0)
+                    .setState("RUNNING")
+                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setDescription("Transfer is ongoing"));
+
+            long start = System.currentTimeMillis();
+
             // Give priority for chunked transfers.
             // TODO: Provide a preference at the API level
             if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
@@ -198,7 +201,9 @@ public class TransportMediator {
                 throw new Exception("No matching connector found to perform the transfer");
             }
 
-            long time = (System.currentTimeMillis() - start) / 1000;
+            long endTime = System.currentTimeMillis();
+
+            double time = (endTime - start) / 1000.0;
 
             logger.info("Transfer {} completed. Time {} S.  Speed {} MB/s", transferId, time,
                     (srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
@@ -206,7 +211,7 @@ public class TransportMediator {
             onStatusCallback.accept(transferId, new TransferState()
                     .setPercentage(100)
                     .setState("COMPLETED")
-                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setUpdateTimeMils(endTime)
                     .setDescription("Transfer successfully completed"));
 
             exitingCallback.accept(transferId, true);
@@ -252,16 +257,21 @@ public class TransportMediator {
 
         @Override
         public Integer call() throws Exception {
-            if (useStreaming) {
-                InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
-                uploader.uploadChunk(chunkIdx, startPos, endPos,inputStream);
-            } else {
-                String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
-                downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
-                uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
-                new File(tempFile).delete();
+            try {
+                if (useStreaming) {
+                    InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
+                    uploader.uploadChunk(chunkIdx, startPos, endPos, inputStream);
+                } else {
+                    String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+                    downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
+                    uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
+                    new File(tempFile).delete();
+                }
+                return chunkIdx;
+            } catch (Exception e) {
+                logger.error("Failed to transfer ", e);
+                throw e;
             }
-            return chunkIdx;
         }
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index e608e9e..b598919 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -108,6 +108,7 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
                 .withPartSize(endByte - startByte);
 
         UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+        inputStream.close();
         this.partETags.add(uploadResult.getPartETag());
         logger.debug("Uploaded S3 chunk {} for resource id {} using stream", chunkId, resource.getResourceId());
     }