You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/08 02:52:18 UTC

[airavata-mft] branch master updated (9550bac -> ee9c0e3)

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git.


    from 9550bac  Replacing a 10x faster Double Stream Buffer into Circular Stream Buffer
     new a2ac56c  DoubleStreamingBuffer documentation
     new ee9c0e3  Validating integrity of transfers using md5

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/airavata/mft/agent/MFTAgent.java    | 14 ++--
 .../airavata/mft/agent/TransportMediator.java      | 59 ++++++++++---
 .../airavata/mft/core/DoubleStreamingBuffer.java   | 98 ++++++++--------------
 .../mft/transport/s3/S3MetadataCollector.java      |  2 +-
 4 files changed, 90 insertions(+), 83 deletions(-)


[airavata-mft] 02/02: Validating integrity of transfers using md5

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit ee9c0e3b5fd5b3e8af2e866f14070217826a86ed
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 7 22:52:05 2020 -0400

    Validating integrity of transfers using md5
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    | 14 ++---
 .../airavata/mft/agent/TransportMediator.java      | 59 +++++++++++++++++-----
 .../mft/transport/s3/S3MetadataCollector.java      |  2 +-
 3 files changed, 56 insertions(+), 19 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 42d5444..b43a6ed 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -123,12 +123,14 @@ public class MFTAgent implements CommandLineRunner {
                         Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
                         outConnector.init(request.getDestinationId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
-                        Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
-                        MetadataCollector metadataCollector = metadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for input"));
-                        metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+                        Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
+                        MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
+                        srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+                        Optional<MetadataCollector> dstMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getDestinationType());
+                        MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
+                        dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
-                        ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
-                        logger.debug("File size " + metadata.getResourceSize());
                         mftConsulClient.submitTransferStateToProcess(request.getTransferId(), agentId, new TransferState()
                             .setState("STARTED")
                             .setPercentage(0)
@@ -136,7 +138,7 @@ public class MFTAgent implements CommandLineRunner {
                             .setPublisher(agentId)
                             .setDescription("Started the transfer"));
 
-                        String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
+                        String transferId = mediator.transfer(request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector, (id, st) -> {
                             try {
                                 mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
                             } catch (MFTConsulClientException e) {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 57206ba..95d9851 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -17,9 +17,11 @@
 
 package org.apache.airavata.mft.agent;
 
+import org.apache.airavata.mft.admin.models.TransferCommand;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.core.*;
 import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,14 +44,18 @@ public class TransportMediator {
         executor.shutdown();
     }
 
-    public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
-                           BiConsumer<String, TransferState> onCallback) throws Exception {
+    public String transfer(TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
+                           MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onCallback) throws Exception {
+
+        ResourceMetadata srcMetadata = srcMetadataCollector.getGetResourceMetadata(command.getSourceId(), command.getSourceToken());
+
+        logger.debug("Source file size " + srcMetadata.getResourceSize() + ". MD5 " + srcMetadata.getMd5sum());
 
         DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
         ConnectorContext context = new ConnectorContext();
-        context.setMetadata(metadata);
+        context.setMetadata(srcMetadata);
         context.setStreamBuffer(streamBuffer);
-        context.setTransferId(transferId);
+        context.setTransferId(command.getTransferId());
 
         TransferTask recvTask = new TransferTask(inConnector, context);
         TransferTask sendTask = new TransferTask(outConnector, context);
@@ -74,17 +80,18 @@ public class TransportMediator {
                         try {
                             ft.get();
                         } catch (InterruptedException e) {
-                            // Interrupted
+
                             logger.error("Transfer task interrupted", e);
                         } catch (ExecutionException e) {
-                            // Snap, something went wrong in the task! Abort! Abort! Abort!
+
                             logger.error("One task failed with error", e);
 
-                            onCallback.accept(transferId, new TransferState()
+                            onCallback.accept(command.getTransferId(), new TransferState()
                                 .setPercentage(0)
                                 .setState("FAILED")
                                 .setUpdateTimeMils(System.currentTimeMillis())
                                 .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+
                             for (Future<Integer> f : futureList) {
                                 try {
                                     Thread.sleep(1000);
@@ -97,18 +104,46 @@ public class TransportMediator {
                         }
                     }
 
+                    Boolean transferred = destMetadataCollector.isAvailable(command.getDestinationId(), command.getDestinationToken());
+
+                    if (!transferred) {
+                        logger.error("Transfer completed but resource is not available in destination");
+                        throw new Exception("Transfer completed but resource is not available in destination");
+                    }
+
+                    ResourceMetadata destMetadata = destMetadataCollector.getGetResourceMetadata(command.getDestinationId(),
+                                                    command.getDestinationToken());
+
+                    if (destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
+                        logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
+                                                            srcMetadata.getMd5sum(), destMetadata.getMd5sum());
+                        throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
+                                                        + " destination md5 " + destMetadata.getMd5sum());
+                    }
+
+                    // Check
+
                     long endTime = System.nanoTime();
 
                     double time = (endTime - startTime) * 1.0 /1000000000;
-                    onCallback.accept(transferId, new TransferState()
+                    onCallback.accept(command.getTransferId(), new TransferState()
                         .setPercentage(100)
                         .setState("COMPLETED")
                         .setUpdateTimeMils(System.currentTimeMillis())
                         .setDescription("Transfer successfully completed"));
-                    logger.info("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
-                    logger.info("Transfer " + transferId + " completed");
+
+                    logger.info("Transfer {} completed.  Speed {} MB/s", command.getTransferId(),
+                                                    (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
+
                 } catch (Exception e) {
-                    logger.error("Transfer {} failed", transferId, e);
+
+                    onCallback.accept(command.getTransferId(), new TransferState()
+                            .setPercentage(0)
+                            .setState("FAILED")
+                            .setUpdateTimeMils(System.currentTimeMillis())
+                            .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+
+                    logger.error("Transfer {} failed", command.getTransferId(), e);
                 } finally {
                     inConnector.destroy();
                     outConnector.destroy();
@@ -117,6 +152,6 @@ public class TransportMediator {
         });
 
         monitor.submit(monitorThread);
-        return transferId;
+        return command.getTransferId();
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 33529e0..acc2417 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -76,7 +76,7 @@ public class S3MetadataCollector implements MetadataCollector {
         ResourceMetadata metadata = new ResourceMetadata();
         ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getResourcePath());
         metadata.setResourceSize(s3Metadata.getContentLength());
-        metadata.setMd5sum(s3Metadata.getContentMD5());
+        metadata.setMd5sum(s3Metadata.getETag());
         metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
         metadata.setCreatedTime(s3Metadata.getLastModified().getTime());
         return metadata;


[airavata-mft] 01/02: DoubleStreamingBuffer documentation

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit a2ac56c97231ee5e580b1f9f9d5506bba70c8a7a
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 7 22:47:16 2020 -0400

    DoubleStreamingBuffer documentation
---
 .../airavata/mft/core/DoubleStreamingBuffer.java   | 98 ++++++++--------------
 1 file changed, 34 insertions(+), 64 deletions(-)

diff --git a/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java b/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
index b776f4f..204c118 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
@@ -3,26 +3,44 @@ package org.apache.airavata.mft.core;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.locks.ReentrantLock;
 
+/**
+ * A Thread safe byte buffer bridging bytes from a output stream to a input stream.
+ * This is an alternative for {@link CircularStreamingBuffer} to avoid synchronization overhead among the read thread
+ * and the write thread. This has a separate input and output stream that should be utilized by two different threads.
+ * Bytes are copied through two internal byte arrays. Always one array is dedicated to writes and another for reads. Once
+ * the write buffer is full and read buffer is empty, read and write threads swap the buffers. This is the only placed where
+ * synchronization is enforced.
+ */
 public class DoubleStreamingBuffer {
-    int bufferSize = 2048;
+
+    /*
+    Size of the internal arrays
+     */
+    private int bufferSize = 2048;
+
+    /*
+    Internal buffers
+     */
+    private final byte[] buffer1 = new byte[bufferSize];
+    private final byte[] buffer2 = new byte[bufferSize];
 
     private OutputStream outputStream = new DoubleStreamingBuffer.DSBOutputStream();
     private InputStream inputStream = new DoubleStreamingBuffer.DSBInputStream();
 
-    CyclicBarrier barrier = new CyclicBarrier(2);
-
-    final byte[] buffer1 = new byte[bufferSize];
-    final byte[] buffer2 = new byte[bufferSize];
+    private CyclicBarrier barrier = new CyclicBarrier(2);
 
-    int buf1Remain = 0;
-    int buf2Remain = 0;
+    /*
+    Remaining bytes in each buffer available for read. Read thread subtracts the count once read and write threads
+    increases the count for writes
+     */
+    private int buf1Remain = 0;
+    private int buf2Remain = 0;
 
-    ReentrantLock buffer1Lock = new ReentrantLock();
-    ReentrantLock buffer2Lock = new ReentrantLock();
+    private ReentrantLock buffer1Lock = new ReentrantLock();
+    private ReentrantLock buffer2Lock = new ReentrantLock();
 
     boolean readBuffer1 = true;
     boolean doneWrite = false;
@@ -35,7 +53,6 @@ public class DoubleStreamingBuffer {
         @Override
         public void close() throws IOException {
             doneWrite = true;
-            System.out.println("Closing");
             if (readBuffer1) {
                 buffer2Lock.unlock();
             } else {
@@ -59,6 +76,7 @@ public class DoubleStreamingBuffer {
                         buffer1Lock.lock();
                     }
 
+                    // wait for reader to enter into read block for the first time
                     barrier.await();
 
                     barrierPassed = true;
@@ -77,6 +95,7 @@ public class DoubleStreamingBuffer {
                     buffer2Lock.unlock();
                     buffer1Lock.lock();
                     try {
+                        // Wait for reader to move into next buffer
                         barrier.await();
                     } catch (Exception e) {
                         throw new IOException();
@@ -92,6 +111,7 @@ public class DoubleStreamingBuffer {
                     buffer1Lock.unlock();
                     buffer2Lock.lock();
                     try {
+                        // Wait for reader to move into next buffer
                         barrier.await();
                     } catch (Exception e) {
                         throw new IOException();
@@ -115,6 +135,7 @@ public class DoubleStreamingBuffer {
                         buffer2Lock.lock();
                     }
 
+                    // wait for writer to enter into read block for the first time
                     barrier.await();
 
                     barrierPassed = true;
@@ -142,6 +163,7 @@ public class DoubleStreamingBuffer {
 
                     readPoint = 0;
                     try {
+                        // Wait for writer to move into next buffer
                         barrier.await();
                     } catch (Exception e) {
                         throw new IOException();
@@ -167,6 +189,7 @@ public class DoubleStreamingBuffer {
                     buffer2Lock.unlock();
                     readPoint = 0;
                     try {
+                        // Wait for writer to move into next buffer
                         barrier.await();
                     } catch (Exception e) {
                         throw new IOException();
@@ -181,64 +204,11 @@ public class DoubleStreamingBuffer {
     }
 
 
-    public static void main(String args[]) throws InterruptedException {
-        DoubleStreamingBuffer dsb = new DoubleStreamingBuffer();
-        CyclicBarrier barrier = new CyclicBarrier(2);
-
-        Thread thread1 = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                System.out.println("Thread 1");
-                try {
-                    barrier.await();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } catch (BrokenBarrierException e) {
-                    e.printStackTrace();
-                }
-                System.out.println("Done Thread 1");
-            }
-        });
-
-        Thread thread2 = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                System.out.println("Thread 2");
-                try {
-                    Thread.sleep(5000);
-                    barrier.await();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } catch (BrokenBarrierException e) {
-                    e.printStackTrace();
-                }
-                System.out.println("Done Thread 2");
-            }
-        });
-
-
-        thread1.start();
-        thread2.start();
-
-        thread1.join();
-        thread2.join();
-
-    }
-
-
     public OutputStream getOutputStream() {
         return outputStream;
     }
 
-    public void setOutputStream(OutputStream outputStream) {
-        this.outputStream = outputStream;
-    }
-
     public InputStream getInputStream() {
         return inputStream;
     }
-
-    public void setInputStream(InputStream inputStream) {
-        this.inputStream = inputStream;
-    }
 }