You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/08 02:52:20 UTC

[airavata-mft] 02/02: Validating integrity of transfers using md5

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit ee9c0e3b5fd5b3e8af2e866f14070217826a86ed
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 7 22:52:05 2020 -0400

    Validating integrity of transfers using md5
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    | 14 ++---
 .../airavata/mft/agent/TransportMediator.java      | 59 +++++++++++++++++-----
 .../mft/transport/s3/S3MetadataCollector.java      |  2 +-
 3 files changed, 56 insertions(+), 19 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 42d5444..b43a6ed 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -123,12 +123,14 @@ public class MFTAgent implements CommandLineRunner {
                         Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
                         outConnector.init(request.getDestinationId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
-                        Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
-                        MetadataCollector metadataCollector = metadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for input"));
-                        metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+                        Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
+                        MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
+                        srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+                        Optional<MetadataCollector> dstMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getDestinationType());
+                        MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
+                        dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
-                        ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
-                        logger.debug("File size " + metadata.getResourceSize());
                         mftConsulClient.submitTransferStateToProcess(request.getTransferId(), agentId, new TransferState()
                             .setState("STARTED")
                             .setPercentage(0)
@@ -136,7 +138,7 @@ public class MFTAgent implements CommandLineRunner {
                             .setPublisher(agentId)
                             .setDescription("Started the transfer"));
 
-                        String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
+                        String transferId = mediator.transfer(request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector, (id, st) -> {
                             try {
                                 mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
                             } catch (MFTConsulClientException e) {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 57206ba..95d9851 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -17,9 +17,11 @@
 
 package org.apache.airavata.mft.agent;
 
+import org.apache.airavata.mft.admin.models.TransferCommand;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.core.*;
 import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,14 +44,18 @@ public class TransportMediator {
         executor.shutdown();
     }
 
-    public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
-                           BiConsumer<String, TransferState> onCallback) throws Exception {
+    public String transfer(TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
+                           MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onCallback) throws Exception {
+
+        ResourceMetadata srcMetadata = srcMetadataCollector.getGetResourceMetadata(command.getSourceId(), command.getSourceToken());
+
+        logger.debug("Source file size " + srcMetadata.getResourceSize() + ". MD5 " + srcMetadata.getMd5sum());
 
         DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
         ConnectorContext context = new ConnectorContext();
-        context.setMetadata(metadata);
+        context.setMetadata(srcMetadata);
         context.setStreamBuffer(streamBuffer);
-        context.setTransferId(transferId);
+        context.setTransferId(command.getTransferId());
 
         TransferTask recvTask = new TransferTask(inConnector, context);
         TransferTask sendTask = new TransferTask(outConnector, context);
@@ -74,17 +80,18 @@ public class TransportMediator {
                         try {
                             ft.get();
                         } catch (InterruptedException e) {
-                            // Interrupted
+
                             logger.error("Transfer task interrupted", e);
                         } catch (ExecutionException e) {
-                            // Snap, something went wrong in the task! Abort! Abort! Abort!
+
                             logger.error("One task failed with error", e);
 
-                            onCallback.accept(transferId, new TransferState()
+                            onCallback.accept(command.getTransferId(), new TransferState()
                                 .setPercentage(0)
                                 .setState("FAILED")
                                 .setUpdateTimeMils(System.currentTimeMillis())
                                 .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+
                             for (Future<Integer> f : futureList) {
                                 try {
                                     Thread.sleep(1000);
@@ -97,18 +104,46 @@ public class TransportMediator {
                         }
                     }
 
+                    Boolean transferred = destMetadataCollector.isAvailable(command.getDestinationId(), command.getDestinationToken());
+
+                    if (!transferred) {
+                        logger.error("Transfer completed but resource is not available in destination");
+                        throw new Exception("Transfer completed but resource is not available in destination");
+                    }
+
+                    ResourceMetadata destMetadata = destMetadataCollector.getGetResourceMetadata(command.getDestinationId(),
+                                                    command.getDestinationToken());
+
+                    if (destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
+                        logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
+                                                            srcMetadata.getMd5sum(), destMetadata.getMd5sum());
+                        throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
+                                                        + " destination md5 " + destMetadata.getMd5sum());
+                    }
+
+                    // Check
+
                     long endTime = System.nanoTime();
 
                     double time = (endTime - startTime) * 1.0 /1000000000;
-                    onCallback.accept(transferId, new TransferState()
+                    onCallback.accept(command.getTransferId(), new TransferState()
                         .setPercentage(100)
                         .setState("COMPLETED")
                         .setUpdateTimeMils(System.currentTimeMillis())
                         .setDescription("Transfer successfully completed"));
-                    logger.info("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
-                    logger.info("Transfer " + transferId + " completed");
+
+                    logger.info("Transfer {} completed.  Speed {} MB/s", command.getTransferId(),
+                                                    (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
+
                 } catch (Exception e) {
-                    logger.error("Transfer {} failed", transferId, e);
+
+                    onCallback.accept(command.getTransferId(), new TransferState()
+                            .setPercentage(0)
+                            .setState("FAILED")
+                            .setUpdateTimeMils(System.currentTimeMillis())
+                            .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+
+                    logger.error("Transfer {} failed", command.getTransferId(), e);
                 } finally {
                     inConnector.destroy();
                     outConnector.destroy();
@@ -117,6 +152,6 @@ public class TransportMediator {
         });
 
         monitor.submit(monitorThread);
-        return transferId;
+        return command.getTransferId();
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 33529e0..acc2417 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -76,7 +76,7 @@ public class S3MetadataCollector implements MetadataCollector {
         ResourceMetadata metadata = new ResourceMetadata();
         ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getResourcePath());
         metadata.setResourceSize(s3Metadata.getContentLength());
-        metadata.setMd5sum(s3Metadata.getContentMD5());
+        metadata.setMd5sum(s3Metadata.getETag());
         metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
         metadata.setCreatedTime(s3Metadata.getLastModified().getTime());
         return metadata;