You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/07 09:21:33 UTC

[airavata-mft] branch master updated: Replacing a 10x faster Double Stream Buffer into Circular Stream Buffer

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 9550bac  Replacing a 10x faster Double Stream Buffer into Circular Stream Buffer
9550bac is described below

commit 9550bac78b19d6ae3160e1073986671a60f7f106
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 7 05:21:16 2020 -0400

    Replacing a 10x faster Double Stream Buffer into Circular Stream Buffer
---
 .../airavata/mft/agent/TransportMediator.java      |   7 +-
 core/pom.xml                                       |   8 +
 .../airavata/mft/core/CircularStreamingBuffer.java |  16 --
 .../apache/airavata/mft/core/ConnectorContext.java |   6 +-
 .../airavata/mft/core/DoubleStreamingBuffer.java   | 244 +++++++++++++++++++++
 .../mft/core/CircularStreamingBufferTest2.java     |  78 +++++++
 .../airavata/mft/transport/scp/SCPReceiver.java    |   4 +-
 .../airavata/mft/transport/scp/SCPSender.java      |   4 +-
 8 files changed, 339 insertions(+), 28 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index c91a3b6..57206ba 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -18,10 +18,7 @@
 package org.apache.airavata.mft.agent;
 
 import org.apache.airavata.mft.admin.models.TransferState;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransferTask;
+import org.apache.airavata.mft.core.*;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -48,7 +45,7 @@ public class TransportMediator {
     public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
                            BiConsumer<String, TransferState> onCallback) throws Exception {
 
-        CircularStreamingBuffer streamBuffer = new CircularStreamingBuffer();
+        DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
         ConnectorContext context = new ConnectorContext();
         context.setMetadata(metadata);
         context.setStreamBuffer(streamBuffer);
diff --git a/core/pom.xml b/core/pom.xml
index 421214c..839957c 100755
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -32,4 +32,12 @@
 
     <artifactId>mft-core</artifactId>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java b/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
index 880c0bc..a4a3590 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
@@ -48,22 +48,6 @@ public class CircularStreamingBuffer {
     }
 
     public class CSBOutputStream extends OutputStream {
-        @Override
-        public void write(byte[] b) throws IOException {
-            write(b, 0, b.length);
-        }
-
-        @Override
-        public void write(byte[] b, int off, int len) throws IOException {
-            for (int i = off; i < len; i ++) {
-                try {
-                    buffer.put(b[i]);
-                    updateRead(false);
-                } catch (InterruptedException e) {
-                    throw new IOException(e);
-                }
-            }
-        }
 
         @Override
         public void flush() throws IOException {
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java
index 2634649..a4cd805 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java
@@ -19,15 +19,15 @@ package org.apache.airavata.mft.core;
 
 public class ConnectorContext {
 
-    private CircularStreamingBuffer streamBuffer;
+    private DoubleStreamingBuffer streamBuffer;
     private ResourceMetadata metadata;
     private String transferId;
 
-    public CircularStreamingBuffer getStreamBuffer() {
+    public DoubleStreamingBuffer getStreamBuffer() {
         return streamBuffer;
     }
 
-    public void setStreamBuffer(CircularStreamingBuffer streamBuffer) {
+    public void setStreamBuffer(DoubleStreamingBuffer streamBuffer) {
         this.streamBuffer = streamBuffer;
     }
 
diff --git a/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java b/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
new file mode 100644
index 0000000..b776f4f
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
@@ -0,0 +1,244 @@
+package org.apache.airavata.mft.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DoubleStreamingBuffer {
+    int bufferSize = 2048;
+
+    private OutputStream outputStream = new DoubleStreamingBuffer.DSBOutputStream();
+    private InputStream inputStream = new DoubleStreamingBuffer.DSBInputStream();
+
+    CyclicBarrier barrier = new CyclicBarrier(2);
+
+    final byte[] buffer1 = new byte[bufferSize];
+    final byte[] buffer2 = new byte[bufferSize];
+
+    int buf1Remain = 0;
+    int buf2Remain = 0;
+
+    ReentrantLock buffer1Lock = new ReentrantLock();
+    ReentrantLock buffer2Lock = new ReentrantLock();
+
+    boolean readBuffer1 = true;
+    boolean doneWrite = false;
+    int readPoint = 0;
+
+    boolean barrierPassed = false;
+
+    public class DSBOutputStream extends OutputStream {
+
+        @Override
+        public void close() throws IOException {
+            doneWrite = true;
+            System.out.println("Closing");
+            if (readBuffer1) {
+                buffer2Lock.unlock();
+            } else {
+                buffer1Lock.unlock();
+            }
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new IOException();
+            }
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+
+            if (!barrierPassed) {
+                try {
+                    if (readBuffer1) {
+                        buffer2Lock.lock();
+                    } else {
+                        buffer1Lock.lock();
+                    }
+
+                    barrier.await();
+
+                    barrierPassed = true;
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+
+            //System.out.println("Write " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
+            if (readBuffer1) {
+                if (buf2Remain < bufferSize) {
+                    buffer2[buf2Remain] = (byte)b;
+                    buf2Remain ++;
+                } else {
+                    barrier.reset();
+                    buffer2Lock.unlock();
+                    buffer1Lock.lock();
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new IOException();
+                    }
+                    write(b);
+                }
+            } else {
+                if (buf1Remain < bufferSize) {
+                    buffer1[buf1Remain] = (byte)b;
+                    buf1Remain++;
+                } else {
+                    barrier.reset();
+                    buffer1Lock.unlock();
+                    buffer2Lock.lock();
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new IOException();
+                    }
+                    write(b);
+                }
+            }
+        }
+    }
+
+    public class DSBInputStream extends InputStream {
+
+        @Override
+        public int read() throws IOException {
+
+            if (!barrierPassed) {
+                try {
+                    if (readBuffer1) {
+                        buffer1Lock.lock();
+                    } else {
+                        buffer2Lock.lock();
+                    }
+
+                    barrier.await();
+
+                    barrierPassed = true;
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+
+            //System.out.println("Read " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
+
+            if (readBuffer1) {
+                if (buf1Remain > 0) {
+                    buf1Remain --;
+                    //System.out.println("Readval " + (buffer1[readPoint] & 0xff));
+                    return buffer1[readPoint++] & 0xff;
+                } else {
+
+                    if (doneWrite && buf2Remain <= 0) {
+                        //System.out.println("Return -1");
+                        return -1;
+                    }
+                    buffer2Lock.lock();
+                    readBuffer1 = false;
+                    buffer1Lock.unlock();
+
+                    readPoint = 0;
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new IOException();
+                    }
+                    //return read();
+                    buf2Remain --;
+                    return buffer2[readPoint++] & 0xff;
+                }
+            } else {
+                if (buf2Remain > 0) {
+                    buf2Remain --;
+                    //System.out.println("Readval " + (buffer2[readPoint] & 0xff));
+
+                    return buffer2[readPoint++] & 0xff;
+                } else {
+
+                    if (doneWrite && buf1Remain <= 0) {
+                        //System.out.println("Return -1");
+                        return -1;
+                    }
+                    buffer1Lock.lock();
+                    readBuffer1 = true;
+                    buffer2Lock.unlock();
+                    readPoint = 0;
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new IOException();
+                    }
+                    //return read();
+                    buf1Remain --;
+                    return buffer1[readPoint++] & 0xff;
+                }
+            }
+
+        }
+    }
+
+
+    public static void main(String args[]) throws InterruptedException {
+        DoubleStreamingBuffer dsb = new DoubleStreamingBuffer();
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Thread thread1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                System.out.println("Thread 1");
+                try {
+                    barrier.await();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (BrokenBarrierException e) {
+                    e.printStackTrace();
+                }
+                System.out.println("Done Thread 1");
+            }
+        });
+
+        Thread thread2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                System.out.println("Thread 2");
+                try {
+                    Thread.sleep(5000);
+                    barrier.await();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (BrokenBarrierException e) {
+                    e.printStackTrace();
+                }
+                System.out.println("Done Thread 2");
+            }
+        });
+
+
+        thread1.start();
+        thread2.start();
+
+        thread1.join();
+        thread2.join();
+
+    }
+
+
+    public OutputStream getOutputStream() {
+        return outputStream;
+    }
+
+    public void setOutputStream(OutputStream outputStream) {
+        this.outputStream = outputStream;
+    }
+
+    public InputStream getInputStream() {
+        return inputStream;
+    }
+
+    public void setInputStream(InputStream inputStream) {
+        this.inputStream = inputStream;
+    }
+}
diff --git a/core/src/test/java/org/apache/airavata/mft/core/CircularStreamingBufferTest2.java b/core/src/test/java/org/apache/airavata/mft/core/CircularStreamingBufferTest2.java
new file mode 100644
index 0000000..a90ed2a
--- /dev/null
+++ b/core/src/test/java/org/apache/airavata/mft/core/CircularStreamingBufferTest2.java
@@ -0,0 +1,78 @@
+package org.apache.airavata.mft.core;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.io.IOException;
+
+public class CircularStreamingBufferTest2 {
+
+    public static void main(String args[]) throws InterruptedException {
+        //final DoubleStreamingBuffer sb = new DoubleStreamingBuffer();
+        final CircularStreamingBuffer sb = new CircularStreamingBuffer();
+
+        String randStr = RandomStringUtils.random(20000000);
+        byte[] randStrBytes = randStr.getBytes();
+        int arrLength = 2000000;
+        byte[] sourceBytes = new byte[arrLength];
+        System.arraycopy(randStrBytes,0,sourceBytes,0,arrLength);
+
+        System.out.println("Source size " + sourceBytes.length);
+        byte[] destBytes = new byte[sourceBytes.length];
+
+        Thread writeThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    for (byte b : sourceBytes) {
+                        sb.getOutputStream().write(b);
+                    }
+                    sb.getOutputStream().flush();
+                    sb.getOutputStream().close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        Thread readThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    int val;
+                    int counter = 0;
+                    while ((val = sb.getInputStream().read()) != -1) {
+                        //System.out.println("loop " + val + " " + counter);
+                        //Thread.sleep(1);
+                        destBytes[counter] = (byte) val;
+                        counter ++;
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+            }
+        });
+
+        writeThread.start();
+        readThread.start();
+
+        System.out.println("Started threads");
+        long start = System.nanoTime();
+        writeThread.join();
+        readThread.join();
+        long end = System.nanoTime();
+
+        System.out.println("Both threads completed - time " + (end - start)/1000000);
+
+        //System.out.println(new String(sourceBytes));
+        //System.out.println(new String(destBytes));
+
+        System.out.println("Equal " + new String(sourceBytes).equals(new String(destBytes)));
+
+
+
+
+
+
+    }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index 194796a..3214cb5 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -19,8 +19,8 @@ package org.apache.airavata.mft.transport.scp;
 
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.DoubleStreamingBuffer;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.service.*;
@@ -83,7 +83,7 @@ public class SCPReceiver implements Connector {
         logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
     }
 
-    private void transferRemoteToStream(Session session, String from, CircularStreamingBuffer streamBuffer) throws Exception {
+    private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
 
         try {
             OutputStream outputStream = streamBuffer.getOutputStream();
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index c4f6280..baee067 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -20,8 +20,8 @@ package org.apache.airavata.mft.transport.scp;
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.DoubleStreamingBuffer;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
@@ -100,7 +100,7 @@ public class SCPSender implements Connector {
         }
     }
 
-    private void copyLocalToRemote(Session session, String to, CircularStreamingBuffer streamBuffer, long fileSize) throws JSchException, IOException {
+    private void copyLocalToRemote(Session session, String to, DoubleStreamingBuffer streamBuffer, long fileSize) throws JSchException, IOException {
         try {
             logger.info("Starting scp send for remote server");
             InputStream inputStream = streamBuffer.getInputStream();