You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/07 09:21:33 UTC
[airavata-mft] branch master updated: Replacing a 10x faster Double
Stream Buffer into Circular Stream Buffer
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 9550bac Replacing a 10x faster Double Stream Buffer into Circular Stream Buffer
9550bac is described below
commit 9550bac78b19d6ae3160e1073986671a60f7f106
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Apr 7 05:21:16 2020 -0400
Replacing a 10x faster Double Stream Buffer into Circular Stream Buffer
---
.../airavata/mft/agent/TransportMediator.java | 7 +-
core/pom.xml | 8 +
.../airavata/mft/core/CircularStreamingBuffer.java | 16 --
.../apache/airavata/mft/core/ConnectorContext.java | 6 +-
.../airavata/mft/core/DoubleStreamingBuffer.java | 244 +++++++++++++++++++++
.../mft/core/CircularStreamingBufferTest2.java | 78 +++++++
.../airavata/mft/transport/scp/SCPReceiver.java | 4 +-
.../airavata/mft/transport/scp/SCPSender.java | 4 +-
8 files changed, 339 insertions(+), 28 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index c91a3b6..57206ba 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -18,10 +18,7 @@
package org.apache.airavata.mft.agent;
import org.apache.airavata.mft.admin.models.TransferState;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransferTask;
+import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -48,7 +45,7 @@ public class TransportMediator {
public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
BiConsumer<String, TransferState> onCallback) throws Exception {
- CircularStreamingBuffer streamBuffer = new CircularStreamingBuffer();
+ DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
ConnectorContext context = new ConnectorContext();
context.setMetadata(metadata);
context.setStreamBuffer(streamBuffer);
diff --git a/core/pom.xml b/core/pom.xml
index 421214c..839957c 100755
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -32,4 +32,12 @@
<artifactId>mft-core</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java b/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
index 880c0bc..a4a3590 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
@@ -48,22 +48,6 @@ public class CircularStreamingBuffer {
}
public class CSBOutputStream extends OutputStream {
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- for (int i = off; i < len; i ++) {
- try {
- buffer.put(b[i]);
- updateRead(false);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
- }
@Override
public void flush() throws IOException {
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java
index 2634649..a4cd805 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorContext.java
@@ -19,15 +19,15 @@ package org.apache.airavata.mft.core;
public class ConnectorContext {
- private CircularStreamingBuffer streamBuffer;
+ private DoubleStreamingBuffer streamBuffer;
private ResourceMetadata metadata;
private String transferId;
- public CircularStreamingBuffer getStreamBuffer() {
+ public DoubleStreamingBuffer getStreamBuffer() {
return streamBuffer;
}
- public void setStreamBuffer(CircularStreamingBuffer streamBuffer) {
+ public void setStreamBuffer(DoubleStreamingBuffer streamBuffer) {
this.streamBuffer = streamBuffer;
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java b/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
new file mode 100644
index 0000000..b776f4f
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java
@@ -0,0 +1,244 @@
+package org.apache.airavata.mft.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DoubleStreamingBuffer {
+ int bufferSize = 2048;
+
+ private OutputStream outputStream = new DoubleStreamingBuffer.DSBOutputStream();
+ private InputStream inputStream = new DoubleStreamingBuffer.DSBInputStream();
+
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ final byte[] buffer1 = new byte[bufferSize];
+ final byte[] buffer2 = new byte[bufferSize];
+
+ int buf1Remain = 0;
+ int buf2Remain = 0;
+
+ ReentrantLock buffer1Lock = new ReentrantLock();
+ ReentrantLock buffer2Lock = new ReentrantLock();
+
+ boolean readBuffer1 = true;
+ boolean doneWrite = false;
+ int readPoint = 0;
+
+ boolean barrierPassed = false;
+
+ public class DSBOutputStream extends OutputStream {
+
+ @Override
+ public void close() throws IOException {
+ doneWrite = true;
+ System.out.println("Closing");
+ if (readBuffer1) {
+ buffer2Lock.unlock();
+ } else {
+ buffer1Lock.unlock();
+ }
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new IOException();
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+
+ if (!barrierPassed) {
+ try {
+ if (readBuffer1) {
+ buffer2Lock.lock();
+ } else {
+ buffer1Lock.lock();
+ }
+
+ barrier.await();
+
+ barrierPassed = true;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ //System.out.println("Write " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
+ if (readBuffer1) {
+ if (buf2Remain < bufferSize) {
+ buffer2[buf2Remain] = (byte)b;
+ buf2Remain ++;
+ } else {
+ barrier.reset();
+ buffer2Lock.unlock();
+ buffer1Lock.lock();
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new IOException();
+ }
+ write(b);
+ }
+ } else {
+ if (buf1Remain < bufferSize) {
+ buffer1[buf1Remain] = (byte)b;
+ buf1Remain++;
+ } else {
+ barrier.reset();
+ buffer1Lock.unlock();
+ buffer2Lock.lock();
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new IOException();
+ }
+ write(b);
+ }
+ }
+ }
+ }
+
+ public class DSBInputStream extends InputStream {
+
+ @Override
+ public int read() throws IOException {
+
+ if (!barrierPassed) {
+ try {
+ if (readBuffer1) {
+ buffer1Lock.lock();
+ } else {
+ buffer2Lock.lock();
+ }
+
+ barrier.await();
+
+ barrierPassed = true;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ //System.out.println("Read " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
+
+ if (readBuffer1) {
+ if (buf1Remain > 0) {
+ buf1Remain --;
+ //System.out.println("Readval " + (buffer1[readPoint] & 0xff));
+ return buffer1[readPoint++] & 0xff;
+ } else {
+
+ if (doneWrite && buf2Remain <= 0) {
+ //System.out.println("Return -1");
+ return -1;
+ }
+ buffer2Lock.lock();
+ readBuffer1 = false;
+ buffer1Lock.unlock();
+
+ readPoint = 0;
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new IOException();
+ }
+ //return read();
+ buf2Remain --;
+ return buffer2[readPoint++] & 0xff;
+ }
+ } else {
+ if (buf2Remain > 0) {
+ buf2Remain --;
+ //System.out.println("Readval " + (buffer2[readPoint] & 0xff));
+
+ return buffer2[readPoint++] & 0xff;
+ } else {
+
+ if (doneWrite && buf1Remain <= 0) {
+ //System.out.println("Return -1");
+ return -1;
+ }
+ buffer1Lock.lock();
+ readBuffer1 = true;
+ buffer2Lock.unlock();
+ readPoint = 0;
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new IOException();
+ }
+ //return read();
+ buf1Remain --;
+ return buffer1[readPoint++] & 0xff;
+ }
+ }
+
+ }
+ }
+
+
+ public static void main(String args[]) throws InterruptedException {
+ DoubleStreamingBuffer dsb = new DoubleStreamingBuffer();
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Thread thread1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ System.out.println("Thread 1");
+ try {
+ barrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ System.out.println("Done Thread 1");
+ }
+ });
+
+ Thread thread2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ System.out.println("Thread 2");
+ try {
+ Thread.sleep(5000);
+ barrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ System.out.println("Done Thread 2");
+ }
+ });
+
+
+ thread1.start();
+ thread2.start();
+
+ thread1.join();
+ thread2.join();
+
+ }
+
+
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ public void setOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ public void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+}
diff --git a/core/src/test/java/org/apache/airavata/mft/core/CircularStreamingBufferTest2.java b/core/src/test/java/org/apache/airavata/mft/core/CircularStreamingBufferTest2.java
new file mode 100644
index 0000000..a90ed2a
--- /dev/null
+++ b/core/src/test/java/org/apache/airavata/mft/core/CircularStreamingBufferTest2.java
@@ -0,0 +1,78 @@
+package org.apache.airavata.mft.core;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.io.IOException;
+
+public class CircularStreamingBufferTest2 {
+
+ public static void main(String args[]) throws InterruptedException {
+ //final DoubleStreamingBuffer sb = new DoubleStreamingBuffer();
+ final CircularStreamingBuffer sb = new CircularStreamingBuffer();
+
+ String randStr = RandomStringUtils.random(20000000);
+ byte[] randStrBytes = randStr.getBytes();
+ int arrLength = 2000000;
+ byte[] sourceBytes = new byte[arrLength];
+ System.arraycopy(randStrBytes,0,sourceBytes,0,arrLength);
+
+ System.out.println("Source size " + sourceBytes.length);
+ byte[] destBytes = new byte[sourceBytes.length];
+
+ Thread writeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (byte b : sourceBytes) {
+ sb.getOutputStream().write(b);
+ }
+ sb.getOutputStream().flush();
+ sb.getOutputStream().close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Thread readThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int val;
+ int counter = 0;
+ while ((val = sb.getInputStream().read()) != -1) {
+ //System.out.println("loop " + val + " " + counter);
+ //Thread.sleep(1);
+ destBytes[counter] = (byte) val;
+ counter ++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ });
+
+ writeThread.start();
+ readThread.start();
+
+ System.out.println("Started threads");
+ long start = System.nanoTime();
+ writeThread.join();
+ readThread.join();
+ long end = System.nanoTime();
+
+ System.out.println("Both threads completed - time " + (end - start)/1000000);
+
+ //System.out.println(new String(sourceBytes));
+ //System.out.println(new String(destBytes));
+
+ System.out.println("Equal " + new String(sourceBytes).equals(new String(destBytes)));
+
+
+
+
+
+
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index 194796a..3214cb5 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -19,8 +19,8 @@ package org.apache.airavata.mft.transport.scp;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.DoubleStreamingBuffer;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.service.*;
@@ -83,7 +83,7 @@ public class SCPReceiver implements Connector {
logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
}
- private void transferRemoteToStream(Session session, String from, CircularStreamingBuffer streamBuffer) throws Exception {
+ private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
try {
OutputStream outputStream = streamBuffer.getOutputStream();
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index c4f6280..baee067 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -20,8 +20,8 @@ package org.apache.airavata.mft.transport.scp;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.DoubleStreamingBuffer;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
@@ -100,7 +100,7 @@ public class SCPSender implements Connector {
}
}
- private void copyLocalToRemote(Session session, String to, CircularStreamingBuffer streamBuffer, long fileSize) throws JSchException, IOException {
+ private void copyLocalToRemote(Session session, String to, DoubleStreamingBuffer streamBuffer, long fileSize) throws JSchException, IOException {
try {
logger.info("Starting scp send for remote server");
InputStream inputStream = streamBuffer.getInputStream();