You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/08 02:52:20 UTC
[airavata-mft] 02/02: Validating integrity of transfers using md5
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit ee9c0e3b5fd5b3e8af2e866f14070217826a86ed
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 7 22:52:05 2020 -0400
Validating integrity of transfers using md5
---
.../org/apache/airavata/mft/agent/MFTAgent.java | 14 ++---
.../airavata/mft/agent/TransportMediator.java | 59 +++++++++++++++++-----
.../mft/transport/s3/S3MetadataCollector.java | 2 +-
3 files changed, 56 insertions(+), 19 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 42d5444..b43a6ed 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -123,12 +123,14 @@ public class MFTAgent implements CommandLineRunner {
Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
outConnector.init(request.getDestinationId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
- Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
- MetadataCollector metadataCollector = metadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for input"));
- metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+ Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
+ MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
+ srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+ Optional<MetadataCollector> dstMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getDestinationType());
+ MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
+ dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
- ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
- logger.debug("File size " + metadata.getResourceSize());
mftConsulClient.submitTransferStateToProcess(request.getTransferId(), agentId, new TransferState()
.setState("STARTED")
.setPercentage(0)
@@ -136,7 +138,7 @@ public class MFTAgent implements CommandLineRunner {
.setPublisher(agentId)
.setDescription("Started the transfer"));
- String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
+ String transferId = mediator.transfer(request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector, (id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
} catch (MFTConsulClientException e) {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 57206ba..95d9851 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -17,9 +17,11 @@
package org.apache.airavata.mft.agent;
+import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,14 +44,18 @@ public class TransportMediator {
executor.shutdown();
}
- public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
- BiConsumer<String, TransferState> onCallback) throws Exception {
+ public String transfer(TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
+ MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onCallback) throws Exception {
+
+ ResourceMetadata srcMetadata = srcMetadataCollector.getGetResourceMetadata(command.getSourceId(), command.getSourceToken());
+
+ logger.debug("Source file size " + srcMetadata.getResourceSize() + ". MD5 " + srcMetadata.getMd5sum());
DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
ConnectorContext context = new ConnectorContext();
- context.setMetadata(metadata);
+ context.setMetadata(srcMetadata);
context.setStreamBuffer(streamBuffer);
- context.setTransferId(transferId);
+ context.setTransferId(command.getTransferId());
TransferTask recvTask = new TransferTask(inConnector, context);
TransferTask sendTask = new TransferTask(outConnector, context);
@@ -74,17 +80,18 @@ public class TransportMediator {
try {
ft.get();
} catch (InterruptedException e) {
- // Interrupted
+
logger.error("Transfer task interrupted", e);
} catch (ExecutionException e) {
- // Snap, something went wrong in the task! Abort! Abort! Abort!
+
logger.error("One task failed with error", e);
- onCallback.accept(transferId, new TransferState()
+ onCallback.accept(command.getTransferId(), new TransferState()
.setPercentage(0)
.setState("FAILED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+
for (Future<Integer> f : futureList) {
try {
Thread.sleep(1000);
@@ -97,18 +104,46 @@ public class TransportMediator {
}
}
+ Boolean transferred = destMetadataCollector.isAvailable(command.getDestinationId(), command.getDestinationToken());
+
+ if (!transferred) {
+ logger.error("Transfer completed but resource is not available in destination");
+ throw new Exception("Transfer completed but resource is not available in destination");
+ }
+
+ ResourceMetadata destMetadata = destMetadataCollector.getGetResourceMetadata(command.getDestinationId(),
+ command.getDestinationToken());
+
+ if (destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
+ logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
+ srcMetadata.getMd5sum(), destMetadata.getMd5sum());
+ throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
+ + " destination md5 " + destMetadata.getMd5sum());
+ }
+
+ // Check
+
long endTime = System.nanoTime();
double time = (endTime - startTime) * 1.0 /1000000000;
- onCallback.accept(transferId, new TransferState()
+ onCallback.accept(command.getTransferId(), new TransferState()
.setPercentage(100)
.setState("COMPLETED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer successfully completed"));
- logger.info("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
- logger.info("Transfer " + transferId + " completed");
+
+ logger.info("Transfer {} completed. Speed {} MB/s", command.getTransferId(),
+ (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
+
} catch (Exception e) {
- logger.error("Transfer {} failed", transferId, e);
+
+ onCallback.accept(command.getTransferId(), new TransferState()
+ .setPercentage(0)
+ .setState("FAILED")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+
+ logger.error("Transfer {} failed", command.getTransferId(), e);
} finally {
inConnector.destroy();
outConnector.destroy();
@@ -117,6 +152,6 @@ public class TransportMediator {
});
monitor.submit(monitorThread);
- return transferId;
+ return command.getTransferId();
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 33529e0..acc2417 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -76,7 +76,7 @@ public class S3MetadataCollector implements MetadataCollector {
ResourceMetadata metadata = new ResourceMetadata();
ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getResourcePath());
metadata.setResourceSize(s3Metadata.getContentLength());
- metadata.setMd5sum(s3Metadata.getContentMD5());
+ metadata.setMd5sum(s3Metadata.getETag());
metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
metadata.setCreatedTime(s3Metadata.getLastModified().getTime());
return metadata;