You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/08/22 16:42:14 UTC

[airavata-mft] 08/22: Adding S3 stream receiver

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit 360e5ad33c33eb9d400e37321a0708c7abb1b774
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Aug 6 15:47:02 2019 -0400

    Adding S3 stream receiver
---
 .../streaming/DoubleByteArrayOutputStream.java     |  4 ++
 transport/pom.xml                                  |  1 +
 transport/{scp-transport => s3-transport}/pom.xml  |  8 +--
 .../airavata/mft/transport/s3/S3Receiver.java      | 45 ++++++++++++++
 .../mft/transport/s3/S3ResourceIdentifier.java     | 68 ++++++++++++++++++++++
 .../apache/airavata/mft/transport/s3/S3Sender.java | 30 ++++++++++
 .../mft/transport/s3/S3TransportOperator.java      | 36 ++++++++++++
 .../airavata/mft/transport/s3/S3TransportUtil.java | 55 +++++++++++++++++
 transport/scp-transport/pom.xml                    |  5 ++
 .../apache/airavata/mft/transport/scp/Main.java    | 12 ++--
 10 files changed, 256 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
index 56728c3..799ba42 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
@@ -44,7 +44,11 @@ public class DoubleByteArrayOutputStream extends OutputStream {
 
     @Override
     public void write(int b) throws IOException {
+        if (processedBytes > maxBytesPerStream) {
+            swapBuffers();
+        }
         this.currentStream.write(b);
+        processedBytes += 1;
     }
 
     @Override
diff --git a/transport/pom.xml b/transport/pom.xml
index 3bcbefa..0d4952a 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -34,6 +34,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>scp-transport</module>
+        <module>s3-transport</module>
     </modules>
 
 
diff --git a/transport/scp-transport/pom.xml b/transport/s3-transport/pom.xml
similarity index 89%
copy from transport/scp-transport/pom.xml
copy to transport/s3-transport/pom.xml
index eccfdca..9395db8 100644
--- a/transport/scp-transport/pom.xml
+++ b/transport/s3-transport/pom.xml
@@ -30,13 +30,13 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>mft-scp-transport</artifactId>
+    <artifactId>mft-s3-transport</artifactId>
 
     <dependencies>
         <dependency>
-            <groupId>com.jcraft</groupId>
-            <artifactId>jsch</artifactId>
-            <version>0.1.55</version>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>1.11.163</version>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
new file mode 100644
index 0000000..d7c516a
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.airavata.mft.core.api.StreamedReceiver;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public class S3Receiver implements StreamedReceiver {
+
+    @Override
+    public void receive(TransportStream stream) throws Exception {
+
+        S3ResourceIdentifier resourceIdentifier = S3TransportUtil.getS3ResourceIdentifier(stream.getSourceId());
+        AmazonS3 s3client = S3TransportUtil.getS3Client(resourceIdentifier.getAccessKey(),
+                resourceIdentifier.getSecretKey(), resourceIdentifier.getRegion());
+        S3Object s3object = s3client.getObject(resourceIdentifier.getBucket(), resourceIdentifier.getRemoteFile());
+        S3ObjectInputStream inputStream = s3object.getObjectContent();
+
+        int read;
+        while ((read = inputStream.read()) != -1) {
+            stream.getOutputStream().write(read);
+        }
+        stream.setStreamCompleted(true);
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3ResourceIdentifier.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3ResourceIdentifier.java
new file mode 100644
index 0000000..085589d
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3ResourceIdentifier.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+public class S3ResourceIdentifier {
+    private String bucket;
+    private String remoteFile;
+    private String region;
+    private String accessKey;
+    private String secretKey;
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
+    public String getRemoteFile() {
+        return remoteFile;
+    }
+
+    public void setRemoteFile(String remoteFile) {
+        this.remoteFile = remoteFile;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
new file mode 100644
index 0000000..cdeb1e5
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import org.apache.airavata.mft.core.api.StreamedSender;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public class S3Sender implements StreamedSender {
+    @Override
+    public void send(TransportStream stream) throws Exception {
+
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportOperator.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportOperator.java
new file mode 100644
index 0000000..79db705
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportOperator.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.airavata.mft.core.api.TransportOperator;
+
+public class S3TransportOperator implements TransportOperator {
+
+    @Override
+    public long getResourceSize(String resourceId) throws Exception {
+        S3ResourceIdentifier resourceIdentifier = S3TransportUtil.getS3ResourceIdentifier(resourceId);
+        AmazonS3 s3client = S3TransportUtil.getS3Client(resourceIdentifier.getAccessKey(),
+                resourceIdentifier.getSecretKey(), resourceIdentifier.getRegion());
+        S3Object s3object = s3client.getObject(resourceIdentifier.getBucket(), resourceIdentifier.getRemoteFile());
+        return s3object.getObjectMetadata().getContentLength();
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportUtil.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportUtil.java
new file mode 100644
index 0000000..ca694bb
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportUtil.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+
+public class S3TransportUtil {
+    public static S3ResourceIdentifier getS3ResourceIdentifier(String resourceId) {
+        S3ResourceIdentifier identifier = new S3ResourceIdentifier();
+
+        switch (resourceId){
+            case "3":
+                identifier.setAccessKey("key");
+                identifier.setSecretKey("secret");
+                identifier.setBucket("airavata-s3");
+                identifier.setRegion("us-east-2");
+                identifier.setRemoteFile("file.txt");
+                return identifier;
+            default:
+                return null;
+        }
+    }
+
+    public static AmazonS3 getS3Client(String accessKey, String secretKey, String region) {
+        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
+        AmazonS3 s3client = AmazonS3ClientBuilder
+                .standard()
+                .withCredentials(new AWSStaticCredentialsProvider(credentials))
+                .withRegion(Regions.fromName(region))
+                .build();
+        return s3client;
+    }
+}
diff --git a/transport/scp-transport/pom.xml b/transport/scp-transport/pom.xml
index eccfdca..d3cbc4e 100644
--- a/transport/scp-transport/pom.xml
+++ b/transport/scp-transport/pom.xml
@@ -43,6 +43,11 @@
             <artifactId>mft-core</artifactId>
             <version>0.18-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-s3-transport</artifactId>
+            <version>0.18-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
index 3dbeae6..00fe32e 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
@@ -21,6 +21,8 @@ package org.apache.airavata.mft.transport.scp;
 
 import org.apache.airavata.mft.core.streaming.TransportMetadata;
 import org.apache.airavata.mft.core.streaming.TransportStream;
+import org.apache.airavata.mft.transport.s3.S3Receiver;
+import org.apache.airavata.mft.transport.s3.S3TransportOperator;
 
 import java.io.IOException;
 
@@ -28,9 +30,10 @@ public class Main {
     public static void main(final String[] arg) throws Exception {
 
         TransportMetadata metadata = new TransportMetadata();
-        SCPTransportOperator operator = new SCPTransportOperator();
-        metadata.setLength(operator.getResourceSize("1"));
-        final TransportStream stream = new TransportStream("1", "2", metadata);
+        //SCPTransportOperator operator = new SCPTransportOperator();
+        S3TransportOperator operator = new S3TransportOperator();
+        metadata.setLength(operator.getResourceSize("3"));
+        final TransportStream stream = new TransportStream("3", "2", metadata);
 
         Runnable r1 = new Runnable() {
             @Override
@@ -66,7 +69,8 @@ public class Main {
         Runnable receiverRun = new Runnable() {
             @Override
             public void run() {
-                SCPReceiver receiver = new SCPReceiver();
+                //SCPReceiver receiver = new SCPReceiver();
+                S3Receiver receiver = new S3Receiver();
                 try {
                     receiver.receive(stream);
                 } catch (Exception e) {