You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/08/22 16:42:14 UTC
[airavata-mft] 08/22: Adding S3 stream receiver
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 360e5ad33c33eb9d400e37321a0708c7abb1b774
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Aug 6 15:47:02 2019 -0400
Adding S3 stream receiver
---
.../streaming/DoubleByteArrayOutputStream.java | 4 ++
transport/pom.xml | 1 +
transport/{scp-transport => s3-transport}/pom.xml | 8 +--
.../airavata/mft/transport/s3/S3Receiver.java | 45 ++++++++++++++
.../mft/transport/s3/S3ResourceIdentifier.java | 68 ++++++++++++++++++++++
.../apache/airavata/mft/transport/s3/S3Sender.java | 30 ++++++++++
.../mft/transport/s3/S3TransportOperator.java | 36 ++++++++++++
.../airavata/mft/transport/s3/S3TransportUtil.java | 55 +++++++++++++++++
transport/scp-transport/pom.xml | 5 ++
.../apache/airavata/mft/transport/scp/Main.java | 12 ++--
10 files changed, 256 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
index 56728c3..799ba42 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/streaming/DoubleByteArrayOutputStream.java
@@ -44,7 +44,11 @@ public class DoubleByteArrayOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
+ if (processedBytes > maxBytesPerStream) {
+ swapBuffers();
+ }
this.currentStream.write(b);
+ processedBytes += 1;
}
@Override
diff --git a/transport/pom.xml b/transport/pom.xml
index 3bcbefa..0d4952a 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -34,6 +34,7 @@
<packaging>pom</packaging>
<modules>
<module>scp-transport</module>
+ <module>s3-transport</module>
</modules>
diff --git a/transport/scp-transport/pom.xml b/transport/s3-transport/pom.xml
similarity index 89%
copy from transport/scp-transport/pom.xml
copy to transport/s3-transport/pom.xml
index eccfdca..9395db8 100644
--- a/transport/scp-transport/pom.xml
+++ b/transport/s3-transport/pom.xml
@@ -30,13 +30,13 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>mft-scp-transport</artifactId>
+ <artifactId>mft-s3-transport</artifactId>
<dependencies>
<dependency>
- <groupId>com.jcraft</groupId>
- <artifactId>jsch</artifactId>
- <version>0.1.55</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>1.11.163</version>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
new file mode 100644
index 0000000..d7c516a
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.airavata.mft.core.api.StreamedReceiver;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public class S3Receiver implements StreamedReceiver {
+
+ @Override
+ public void receive(TransportStream stream) throws Exception {
+
+ S3ResourceIdentifier resourceIdentifier = S3TransportUtil.getS3ResourceIdentifier(stream.getSourceId());
+ AmazonS3 s3client = S3TransportUtil.getS3Client(resourceIdentifier.getAccessKey(),
+ resourceIdentifier.getSecretKey(), resourceIdentifier.getRegion());
+ S3Object s3object = s3client.getObject(resourceIdentifier.getBucket(), resourceIdentifier.getRemoteFile());
+ S3ObjectInputStream inputStream = s3object.getObjectContent();
+
+ int read;
+ while ((read = inputStream.read()) != -1) {
+ stream.getOutputStream().write(read);
+ }
+ stream.setStreamCompleted(true);
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3ResourceIdentifier.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3ResourceIdentifier.java
new file mode 100644
index 0000000..085589d
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3ResourceIdentifier.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+public class S3ResourceIdentifier {
+ private String bucket;
+ private String remoteFile;
+ private String region;
+ private String accessKey;
+ private String secretKey;
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ public String getRemoteFile() {
+ return remoteFile;
+ }
+
+ public void setRemoteFile(String remoteFile) {
+ this.remoteFile = remoteFile;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
new file mode 100644
index 0000000..cdeb1e5
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import org.apache.airavata.mft.core.api.StreamedSender;
+import org.apache.airavata.mft.core.streaming.TransportStream;
+
+public class S3Sender implements StreamedSender {
+ @Override
+ public void send(TransportStream stream) throws Exception {
+
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportOperator.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportOperator.java
new file mode 100644
index 0000000..79db705
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportOperator.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.airavata.mft.core.api.TransportOperator;
+
+public class S3TransportOperator implements TransportOperator {
+
+ @Override
+ public long getResourceSize(String resourceId) throws Exception {
+ S3ResourceIdentifier resourceIdentifier = S3TransportUtil.getS3ResourceIdentifier(resourceId);
+ AmazonS3 s3client = S3TransportUtil.getS3Client(resourceIdentifier.getAccessKey(),
+ resourceIdentifier.getSecretKey(), resourceIdentifier.getRegion());
+ S3Object s3object = s3client.getObject(resourceIdentifier.getBucket(), resourceIdentifier.getRemoteFile());
+ return s3object.getObjectMetadata().getContentLength();
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportUtil.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportUtil.java
new file mode 100644
index 0000000..ca694bb
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3TransportUtil.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+
+public class S3TransportUtil {
+ public static S3ResourceIdentifier getS3ResourceIdentifier(String resourceId) {
+ S3ResourceIdentifier identifier = new S3ResourceIdentifier();
+
+ switch (resourceId){
+ case "3":
+ identifier.setAccessKey("key");
+ identifier.setSecretKey("secret");
+ identifier.setBucket("airavata-s3");
+ identifier.setRegion("us-east-2");
+ identifier.setRemoteFile("file.txt");
+ return identifier;
+ default:
+ return null;
+ }
+ }
+
+ public static AmazonS3 getS3Client(String accessKey, String secretKey, String region) {
+ AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
+ AmazonS3 s3client = AmazonS3ClientBuilder
+ .standard()
+ .withCredentials(new AWSStaticCredentialsProvider(credentials))
+ .withRegion(Regions.fromName(region))
+ .build();
+ return s3client;
+ }
+}
diff --git a/transport/scp-transport/pom.xml b/transport/scp-transport/pom.xml
index eccfdca..d3cbc4e 100644
--- a/transport/scp-transport/pom.xml
+++ b/transport/scp-transport/pom.xml
@@ -43,6 +43,11 @@
<artifactId>mft-core</artifactId>
<version>0.18-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-s3-transport</artifactId>
+ <version>0.18-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
index 3dbeae6..00fe32e 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/Main.java
@@ -21,6 +21,8 @@ package org.apache.airavata.mft.transport.scp;
import org.apache.airavata.mft.core.streaming.TransportMetadata;
import org.apache.airavata.mft.core.streaming.TransportStream;
+import org.apache.airavata.mft.transport.s3.S3Receiver;
+import org.apache.airavata.mft.transport.s3.S3TransportOperator;
import java.io.IOException;
@@ -28,9 +30,10 @@ public class Main {
public static void main(final String[] arg) throws Exception {
TransportMetadata metadata = new TransportMetadata();
- SCPTransportOperator operator = new SCPTransportOperator();
- metadata.setLength(operator.getResourceSize("1"));
- final TransportStream stream = new TransportStream("1", "2", metadata);
+ //SCPTransportOperator operator = new SCPTransportOperator();
+ S3TransportOperator operator = new S3TransportOperator();
+ metadata.setLength(operator.getResourceSize("3"));
+ final TransportStream stream = new TransportStream("3", "2", metadata);
Runnable r1 = new Runnable() {
@Override
@@ -66,7 +69,8 @@ public class Main {
Runnable receiverRun = new Runnable() {
@Override
public void run() {
- SCPReceiver receiver = new SCPReceiver();
+ //SCPReceiver receiver = new SCPReceiver();
+ S3Receiver receiver = new S3Receiver();
try {
receiver.receive(stream);
} catch (Exception e) {