You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/08/22 16:42:09 UTC
[airavata-mft] 03/22: Initial stream based data movement framework
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 036213f18f6b8a473a620481b3826535ee522e46
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Aug 6 02:45:28 2019 -0400
Initial stream based data movement framework
---
core/pom.xml | 15 ++
.../airavata/mft/core/api/StreamedReceiver.java | 7 +
.../airavata/mft/core/api/StreamedSender.java | 7 +
.../core/streaming/DoubleByteArrayInputStream.java | 36 +++++
.../streaming/DoubleByteArrayOutputStream.java | 72 +++++++++
.../mft/core/streaming/TransportStream.java | 47 ++++++
pom.xml | 82 ++++++++++
transport/pom.xml | 19 +++
transport/scp-transport/pom.xml | 27 ++++
.../apache/airavata/mft/transport/scp/Main.java | 63 ++++++++
.../airavata/mft/transport/scp/SCPReceiver.java | 173 +++++++++++++++++++++
.../airavata/mft/transport/scp/SCPSender.java | 11 ++
.../mft/transport/scp/SSHResourceIdentifier.java | 67 ++++++++
13 files changed, 626 insertions(+)
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 0000000..c87446e
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-mft</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.18-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mft-core</artifactId>
+
+
+</project>
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/StreamedReceiver.java b/core/src/main/java/org/apache/airavata/mft/core/api/StreamedReceiver.java
new file mode 100644
index 0000000..47db51e
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/StreamedReceiver.java
@@ -0,0 +1,7 @@
+package org.apache.airavata.mft.core.api;
+
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public interface StreamedReceiver {
+ public void receive(String resourceIdentifier, TransportStream stream) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/StreamedSender.java b/core/src/main/java/org/apache/airavata/mft/core/api/StreamedSender.java
new file mode 100644
index 0000000..9d3986c
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/StreamedSender.java
@@ -0,0 +1,7 @@
+package org.apache.airavata.mft.core.api;
+
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public interface StreamedSender {
+ public void send(String resourceIdentifier, TransportStream stream) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayInputStream.java b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayInputStream.java
new file mode 100644
index 0000000..56f5dc5
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayInputStream.java
@@ -0,0 +1,36 @@
+package org.apache.airavata.mft.core.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class DoubleByteArrayInputStream extends InputStream {
+
+ private DoubleByteArrayOutputStream outputStream;
+ private InputStream currentInputStream;
+
+ public DoubleByteArrayInputStream(DoubleByteArrayOutputStream outputStream) {
+ this.outputStream = outputStream;
+ this.currentInputStream = outputStream.asInputStream();
+ }
+
+ @Override
+ public int read() throws IOException {
+ refresh();
+ return this.currentInputStream.read();
+ }
+
+ @Override
+ public int available() throws IOException {
+ refresh();
+ return this.currentInputStream.available();
+ }
+
+ private void refresh() throws IOException {
+ if (this.currentInputStream.available() == 0) {
+ InputStream tempInputStream = this.outputStream.asInputStream();
+ if (tempInputStream != null) {
+ this.currentInputStream = tempInputStream;
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
new file mode 100644
index 0000000..416a8eb
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
@@ -0,0 +1,72 @@
+package org.apache.airavata.mft.core.streaming;
+
+import java.io.*;
+
+public class DoubleByteArrayOutputStream extends OutputStream {
+
+ private ByteArrayOutputStream stream1 = new ByteArrayOutputStream();
+ private ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
+
+ private ByteArrayOutputStream currentStream = stream1;
+ private int activeStream = 1;
+
+ private long maxBytesPerStream = 1 * 1000 * 1000;
+ private long processedBytes = 0;
+ private boolean clearedNonActiveStream = false;
+
+ @Override
+ public void write(int b) throws IOException {
+ this.currentStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (processedBytes > maxBytesPerStream) {
+ while (!clearedNonActiveStream) {
+ try {
+ Thread.sleep(100);
+ System.out.println("Waiting until non active buffer gets emptied");
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ if (activeStream == 1) {
+ activeStream = 2;
+ currentStream = stream2;
+ } else {
+ activeStream = 1;
+ currentStream = stream1;
+ }
+ currentStream.reset();
+ processedBytes = 0;
+ clearedNonActiveStream = false;
+ }
+ this.currentStream.write(b, off, len);
+ processedBytes += len;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ this.currentStream.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.currentStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.stream1.close();
+ this.stream2.close();
+ }
+
+ public InputStream asInputStream() {
+ if (clearedNonActiveStream) {
+ return null;
+ }
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(activeStream == 1 ? stream2.toByteArray(): stream1.toByteArray());
+ this.clearedNonActiveStream = true;
+ return inputStream;
+ }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/streaming/TransportStream.java b/core/src/main/java/org/apache/airavata/mft/core/streaming/TransportStream.java
new file mode 100644
index 0000000..d8a95b6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/streaming/TransportStream.java
@@ -0,0 +1,47 @@
+package org.apache.airavata.mft.core.streaming;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TransportStream {
+
+ private OutputStream outputStream = new DoubleByteArrayOutputStream();
+ private InputStream inputStream = new DoubleByteArrayInputStream((DoubleByteArrayOutputStream) outputStream);
+ private long length = -1;
+ private AtomicBoolean streamCompleted = new AtomicBoolean(false);
+
+ public TransportStream() throws IOException {
+ }
+
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ public void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ public void setOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ public AtomicBoolean getStreamCompleted() {
+ return streamCompleted;
+ }
+
+ public void setStreamCompleted(AtomicBoolean streamCompleted) {
+ this.streamCompleted = streamCompleted;
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..983b1cc
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <prerequisites>
+ <maven>3.0</maven>
+ </prerequisites>
+ <modules>
+ <module>core</module>
+ <module>transport</module>
+ </modules>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>21</version>
+ </parent>
+
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-mft</artifactId>
+ <packaging>pom</packaging>
+ <name>Airavata MFT</name>
+ <version>0.18-SNAPSHOT</version>
+
+ <url>http://airavata.apache.org/</url>
+ <inceptionYear>2011</inceptionYear>
+
+ <scm>
+ <connection>scm:git:https://github.com/apache/airavata.git</connection>
+ <developerConnection>scm:git:https://github.com/apache/airavata.git</developerConnection>
+ <url>https://github.com/apache/airavata</url>
+ <tag>HEAD</tag>
+ </scm>
+
+ <mailingLists>
+
+ <mailingList>
+ <name>Airavata Developer List</name>
+ <subscribe>dev-subscribe@airavata.apache.org</subscribe>
+ <unsubscribe>dev-unsubscribe@airavata.apache.org</unsubscribe>
+ <post>mailto:dev@airavata.apache.org</post>
+ <archive>http://mail-archives.apache.org/mod_mbox/airavata-dev/</archive>
+ </mailingList>
+
+ <mailingList>
+ <name>Airavata Users List</name>
+ <subscribe>users-subscribe@airavata.apache.org</subscribe>
+ <unsubscribe>users-unsubscribe@airavata.apache.org</unsubscribe>
+ <post>mailto:users@airavata.apache.org</post>
+ <archive>http://mail-archives.apache.org/mod_mbox/airavata-users/</archive>
+ </mailingList>
+
+ </mailingLists>
+
+ <issueManagement>
+ <url>https://issues.apache.org/jira/browse/AIRAVATA</url>
+ </issueManagement>
+
+</project>
\ No newline at end of file
diff --git a/transport/pom.xml b/transport/pom.xml
new file mode 100644
index 0000000..5e26abd
--- /dev/null
+++ b/transport/pom.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-mft</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.18-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mft-transport</artifactId>
+ <packaging>pom</packaging>
+ <modules>
+ <module>scp-transport</module>
+ </modules>
+
+
+</project>
\ No newline at end of file
diff --git a/transport/scp-transport/pom.xml b/transport/scp-transport/pom.xml
new file mode 100644
index 0000000..f48bd0d
--- /dev/null
+++ b/transport/scp-transport/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>mft-transport</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.18-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mft-scp-transport</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>0.1.55</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-core</artifactId>
+ <version>0.18-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
new file mode 100644
index 0000000..33fdb9f
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
@@ -0,0 +1,63 @@
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.JSchException;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+import java.io.IOException;
+
+public class Main {
+ public static void main(final String[] arg) throws IOException, JSchException {
+
+ final TransportStream stream = new TransportStream();
+
+ Runnable r1 = new Runnable() {
+ @Override
+ public void run() {
+ int asInt;
+ try {
+ long read = 0;
+ while (true) {
+ if (stream.getInputStream().available() > 0) {
+ asInt = stream.getInputStream().read();
+ read++;
+ //char c = (char)asInt;
+ //System.out.print(c);
+ } else {
+ if (stream.getStreamCompleted().get()) {
+ break;
+ } else {
+ try {
+ Thread.sleep(100);
+ System.out.println("Waiting " + read);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Runnable r2 = new Runnable() {
+ @Override
+ public void run() {
+ SCPReceiver receiver = new SCPReceiver();
+ try {
+ receiver.receive("1", stream);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t1 = new Thread(r1);
+ t1.start();
+
+ Thread t2 = new Thread(r2);
+ t2.start();
+ System.out.println("Done");
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
new file mode 100644
index 0000000..bba84b8
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -0,0 +1,173 @@
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.*;
+import org.apache.airavata.mft.core.api.StreamedReceiver;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+import java.io.*;
+import java.util.Properties;
+
+public class SCPReceiver implements StreamedReceiver {
+
+ @Override
+ public void receive(String resourceIdentifier, TransportStream stream) throws Exception {
+ SSHResourceIdentifier sshResourceIdentifier = getSSHResourceIdentifier(resourceIdentifier);
+ Session session = createSession(sshResourceIdentifier.getUser(), sshResourceIdentifier.getHost(),
+ sshResourceIdentifier.getPort(),
+ sshResourceIdentifier.getKeyFile(),
+ sshResourceIdentifier.getKeyPassphrase());
+ transferRemoteToStream(session, sshResourceIdentifier.getRemotePath(), stream);
+ }
+
+ // TODO replace with an API call to the registry
+ private SSHResourceIdentifier getSSHResourceIdentifier(String resourceId) {
+ SSHResourceIdentifier identifier = new SSHResourceIdentifier();
+ identifier.setHost("pgadev.scigap.org");
+ identifier.setUser("pga");
+ identifier.setPort(22);
+ identifier.setKeyFile("/Users/user/.ssh/id_rsa");
+ identifier.setKeyPassphrase(null);
+ identifier.setRemotePath("/var/www/portals/gateway-user-data/dev-seagrid/eromads6/DefaultProject/Gaussian_C11470169729/file.txt");
+ return identifier;
+ }
+
+ private void transferRemoteToStream(Session session, String from, TransportStream stream) throws JSchException, IOException {
+
+ // exec 'scp -f rfile' remotely
+ String command = "scp -f " + from;
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ byte[] buf = new byte[1024];
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+
+ String file = null;
+ for (int i = 0; ; i++) {
+ in.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ file = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ System.out.println("file-size=" + filesize + ", file=" + file);
+ stream.setLength(filesize);
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ // read a content of lfile
+ int bufSize;
+ while (true) {
+ if (buf.length < filesize) bufSize = buf.length;
+ else bufSize = (int) filesize;
+ bufSize = in.read(buf, 0, bufSize);
+ if (bufSize < 0) {
+ // error
+ break;
+ }
+ stream.getOutputStream().write(buf, 0, bufSize);
+ stream.getOutputStream().flush();
+
+ filesize -= bufSize;
+ if (filesize == 0L) break;
+ }
+
+ if (checkAck(in) != 0) {
+ System.exit(0);
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ }
+
+ stream.getStreamCompleted().set(true);
+ channel.disconnect();
+ session.disconnect();
+ }
+
+ public int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ // b may be 0 for success,
+ // 1 for error,
+ // 2 for fatal error,
+ // -1
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ System.out.print(sb.toString());
+ }
+ if (b == 2) { // fatal error
+ System.out.print(sb.toString());
+ }
+ }
+ return b;
+ }
+
+ private static Session createSession(String user, String host, int port, String keyFilePath, String keyPassword) {
+ try {
+ JSch jsch = new JSch();
+
+ if (keyFilePath != null) {
+ if (keyPassword != null) {
+ jsch.addIdentity(keyFilePath, keyPassword);
+ } else {
+ jsch.addIdentity(keyFilePath);
+ }
+ }
+
+ Properties config = new java.util.Properties();
+ config.put("StrictHostKeyChecking", "no");
+
+ Session session = jsch.getSession(user, host, port);
+ session.setConfig(config);
+ session.connect();
+
+ return session;
+ } catch (JSchException e) {
+ System.out.println(e);
+ return null;
+ }
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
new file mode 100644
index 0000000..64e2210
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -0,0 +1,11 @@
+package org.apache.airavata.mft.transport.scp;
+
+import org.apache.airavata.mft.core.api.StreamedSender;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public class SCPSender implements StreamedSender {
+ @Override
+ public void send(String resourceIdentifier, TransportStream stream) throws Exception {
+
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SSHResourceIdentifier.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SSHResourceIdentifier.java
new file mode 100644
index 0000000..6335b7c
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SSHResourceIdentifier.java
@@ -0,0 +1,67 @@
+package org.apache.airavata.mft.transport.scp;
+
+public class SSHResourceIdentifier {
+ private String id;
+ private String remotePath;
+ private String host;
+ private String user;
+ private int port;
+ private String keyFile;
+ private String keyPassphrase;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getRemotePath() {
+ return remotePath;
+ }
+
+ public void setRemotePath(String remotePath) {
+ this.remotePath = remotePath;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getKeyFile() {
+ return keyFile;
+ }
+
+ public void setKeyFile(String keyFile) {
+ this.keyFile = keyFile;
+ }
+
+ public String getKeyPassphrase() {
+ return keyPassphrase;
+ }
+
+ public void setKeyPassphrase(String keyPassphrase) {
+ this.keyPassphrase = keyPassphrase;
+ }
+}