You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/04/06 20:22:21 UTC
[airavata-mft] branch master updated: Adding S3 resource support
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 62fae3d Adding S3 resource support
62fae3d is described below
commit 62fae3d0ab2921fa8bf0bea7970e233f842e6948
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Apr 6 16:22:07 2020 -0400
Adding S3 resource support
---
agent/pom.xml | 5 +
.../airavata/mft/core/CircularStreamingBuffer.java | 31 +----
.../airavata/mft/core/ConnectorResolver.java | 10 ++
.../mft/core/MetadataCollectorResolver.java | 3 +
.../resource/server/backend/ResourceBackend.java | 5 +
.../backend/airavata/AiravataResourceBackend.java | 53 ++++++--
.../backend/file/FileBasedResourceBackend.java | 77 +++++++++--
.../server/backend/sql/SQLResourceBackend.java | 24 ++++
.../server/handler/ResourceServiceHandler.java | 150 ++++++++++++++++++---
.../resources/distribution/conf/resources.json | 7 +
.../server/src/main/resources/resources.json | 7 +
.../stub/src/main/proto/ResourceService.proto | 55 ++++++++
.../mft/secret/server/backend/SecretBackend.java | 6 +
.../backend/airavata/AiravataSecretBackend.java | 27 +++-
.../backend/file/FileBasedSecretBackend.java | 48 ++++++-
.../server/backend/sql/SQLSecretBackend.java | 20 +++
.../server/handler/SecretServiceHandler.java | 79 ++++++++++-
.../main/resources/distribution/conf/secrets.json | 6 +
.../server/src/main/resources/secrets.json | 6 +
.../stub/src/main/proto/SecretService.proto | 54 ++++++++
transport/pom.xml | 1 +
transport/{ => s3-transport}/pom.xml | 21 ++-
.../mft/transport/s3/S3MetadataCollector.java | 104 ++++++++++++++
.../airavata/mft/transport/s3/S3Receiver.java | 91 +++++++++++++
.../apache/airavata/mft/transport/s3/S3Sender.java | 77 +++++++++++
.../mft/transport/scp/SSHResourceIdentifier.java | 85 ------------
26 files changed, 879 insertions(+), 173 deletions(-)
diff --git a/agent/pom.xml b/agent/pom.xml
index e6e01b8..cd00e66 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -45,6 +45,11 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
+ <artifactId>mft-s3-transport</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
<artifactId>mft-admin</artifactId>
<version>0.01-SNAPSHOT</version>
</dependency>
diff --git a/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java b/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
index 9086626..880c0bc 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/CircularStreamingBuffer.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Semaphore;
public class CircularStreamingBuffer {
- private int bufferSize = 1024;
+ private int bufferSize = 18192;
private ArrayBlockingQueue<Byte> buffer = new ArrayBlockingQueue<>(bufferSize);
private boolean osClosed = false;
@@ -91,7 +91,9 @@ public class CircularStreamingBuffer {
@Override
public int read() throws IOException {
+ //System.out.println("Read int");
Byte res = buffer.poll();
+ //System.out.println("Done read int");
if (res == null) {
//System.out.println("Received null in is.read()");
if (osClosed) return -1;
@@ -107,32 +109,5 @@ public class CircularStreamingBuffer {
return res & 0xff;
}
}
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- for (int i = off; i < len; i++) {
- //System.out.println("Begin read");
- int res = read();
- //System.out.println("End read " + res);
- if (res == -1) {
- //System.out.println("Received -1 in is.read(byte[], off, len)");
- if (i == off) {
- //System.out.println("Return -1");
- return -1;
- } else {
- //System.out.println("Return " + (i - off));
- return i - off;
- }
- } else {
- b[i] = (byte)res;
- }
- }
- return len - off;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
}
}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index 691993c..3f76d07 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -47,6 +47,16 @@ public final class ConnectorResolver {
break;
}
break;
+ case "S3":
+ switch (direction) {
+ case "IN":
+ className = "org.apache.airavata.mft.transport.s3.S3Receiver";
+ break;
+ case "OUT":
+ className = "org.apache.airavata.mft.transport.s3.S3Sender";
+ break;
+ }
+ break;
}
if (className != null) {
diff --git a/core/src/main/java/org/apache/airavata/mft/core/MetadataCollectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/MetadataCollectorResolver.java
index 47e36cb..653e56d 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/MetadataCollectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/MetadataCollectorResolver.java
@@ -33,6 +33,9 @@ public final class MetadataCollectorResolver {
case "LOCAL":
className = "org.apache.airavata.mft.transport.local.LocalMetadataCollector";
break;
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3MetadataCollector";
+ break;
}
if (className != null) {
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
index 93a61db..6857bde 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
@@ -41,4 +41,9 @@ public interface ResourceBackend {
public LocalResource createLocalResource(LocalResourceCreateRequest request) throws Exception;
public boolean updateLocalResource(LocalResourceUpdateRequest request) throws Exception;
public boolean deleteLocalResource(LocalResourceDeleteRequest request) throws Exception;
+
+ public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception;
+ public S3Resource createS3Resource(S3ResourceCreateRequest request) throws Exception;
+ public boolean updateS3Resource(S3ResourceUpdateRequest request) throws Exception;
+ public boolean deleteS3Resource(S3ResourceDeleteRequest request) throws Exception;
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
index 6b301a2..f888134 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
@@ -97,17 +97,19 @@ public class AiravataResourceBackend implements ResourceBackend {
@Override
public SCPStorage createSCPStorage(SCPStorageCreateRequest request) {
- return null;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
@Override
public boolean updateSCPStorage(SCPStorageUpdateRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean deleteSCPStorage(SCPStorageDeleteRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
@@ -126,36 +128,67 @@ public class AiravataResourceBackend implements ResourceBackend {
@Override
public SCPResource createSCPResource(SCPResourceCreateRequest request) {
- return null;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean updateSCPResource(SCPResourceUpdateRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean deleteSCPResource(SCPResourceDeleteRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) {
- return Optional.empty();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public LocalResource createLocalResource(LocalResourceCreateRequest request) {
- return null;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean updateLocalResource(LocalResourceUpdateRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean deleteLocalResource(LocalResourceDeleteRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public S3Resource createS3Resource(S3ResourceCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public boolean updateS3Resource(S3ResourceUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public boolean deleteS3Resource(S3ResourceDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
index 86cad51..cb504b3 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
@@ -50,22 +50,25 @@ public class FileBasedResourceBackend implements ResourceBackend {
@Override
public Optional<SCPStorage> getSCPStorage(SCPStorageGetRequest request) throws Exception {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
@Override
public SCPStorage createSCPStorage(SCPStorageCreateRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean updateSCPStorage(SCPStorageUpdateRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean deleteSCPStorage(SCPStorageDeleteRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
@@ -107,17 +110,20 @@ public class FileBasedResourceBackend implements ResourceBackend {
@Override
public SCPResource createSCPResource(SCPResourceCreateRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean updateSCPResource(SCPResourceUpdateRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean deleteSCPResource(SCPResourceDeleteRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
@@ -148,16 +154,67 @@ public class FileBasedResourceBackend implements ResourceBackend {
@Override
public LocalResource createLocalResource(LocalResourceCreateRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean updateLocalResource(LocalResourceUpdateRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
@Override
public boolean deleteLocalResource(LocalResourceDeleteRequest request) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray resourceList = (JSONArray) obj;
+
+ System.out.println("All resources ");
+ List<S3Resource> s3Resources = (List<S3Resource>) resourceList.stream()
+ .filter(resource -> "S3".equals(((JSONObject) resource).get("type").toString()))
+ .map(resource -> {
+ JSONObject r = (JSONObject) resource;
+
+ S3Resource s3Resource = S3Resource.newBuilder()
+ .setResourcePath(r.get("resourcePath").toString())
+ .setResourceId(r.get("resourceId").toString())
+ .setBucketName(r.get("bucketName").toString())
+ .setRegion(r.get("region").toString())
+ .build();
+
+ return s3Resource;
+ }).collect(Collectors.toList());
+ return s3Resources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
+ }
+
+ }
+
+ @Override
+ public S3Resource createS3Resource(S3ResourceCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public boolean updateS3Resource(S3ResourceUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public boolean deleteS3Resource(S3ResourceDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
}
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
index de84e29..a02d5a9 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
@@ -131,4 +131,28 @@ public class SQLResourceBackend implements ResourceBackend {
localResourceRepository.deleteById(request.getResourceId());
return true;
}
+
+ @Override
+ public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public S3Resource createS3Resource(S3ResourceCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public boolean updateS3Resource(S3ResourceUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
+
+ @Override
+ public boolean deleteS3Resource(S3ResourceDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+
+ }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/ResourceServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/ResourceServiceHandler.java
index dcc75bd..16bd850 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/ResourceServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/ResourceServiceHandler.java
@@ -34,6 +34,7 @@
package org.apache.airavata.mft.resource.server.handler;
import com.google.protobuf.Empty;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.*;
@@ -42,6 +43,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.Optional;
+
@GRpcService
public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceImplBase {
@@ -57,11 +60,17 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onNext(storage);
responseObserver.onCompleted();
}, () -> {
- responseObserver.onError(new Exception("No SCP Storage with id " + request.getStorageId()));
+
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No SCP Storage with id " + request.getStorageId())
+ .asRuntimeException());
});
} catch (Exception e) {
logger.error("Failed in retrieving storage with id " + request.getStorageId(), e);
- responseObserver.onError(new Exception("Failed in retrieving storage with id " + request.getStorageId()));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving storage with id " + request.getStorageId())
+ .asRuntimeException());
}
}
@@ -72,7 +81,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed in creating the scp storage", e);
- responseObserver.onError(new Exception("Failed in creating the scp storage", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the scp storage")
+ .asRuntimeException());
}
}
@@ -83,7 +95,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed in updating the scp storage {}", request.getStorageId(), e);
- responseObserver.onError(new Exception("Failed in updating the scp storage", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the scp storage")
+ .asRuntimeException());
}
}
@@ -97,11 +112,17 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} else {
logger.error("Failed to delete SCP Storage with id " + request.getStorageId());
- responseObserver.onError(new Exception("Failed to delete SCP Storage with id " + request.getStorageId()));
+
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Failed to delete SCP Storage with id " + request.getStorageId())
+ .asRuntimeException());
}
} catch (Exception e) {
logger.error("Failed in deleting the scp storage {}", request.getStorageId(), e);
- responseObserver.onError(new Exception("Failed in deleting the scp storage", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the scp storage")
+ .asRuntimeException());
}
}
@@ -112,11 +133,17 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onNext(resource);
responseObserver.onCompleted();
}, () -> {
- responseObserver.onError(new Exception("No SCP Resource with id " + request.getResourceId()));
+
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No SCP Resource with id " + request.getResourceId())
+ .asRuntimeException());
});
} catch (Exception e) {
logger.error("Failed in retrieving resource with id {}", request.getResourceId(), e);
- responseObserver.onError(new Exception("Failed in retrieving resource with id " + request.getResourceId()));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving resource with id " + request.getResourceId())
+ .asRuntimeException());
}
}
@@ -127,7 +154,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed in creating the scp resource", e);
- responseObserver.onError(new Exception("Failed in creating the scp resource", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the scp resource")
+ .asRuntimeException());
}
}
@@ -138,7 +168,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed in updating the scp resource {}", request.getResourceId(), e);
- responseObserver.onError(new Exception("Failed in updating the scp resource", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the scp resource")
+ .asRuntimeException());
}
}
@@ -149,11 +182,17 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
if (res) {
responseObserver.onCompleted();
} else {
- responseObserver.onError(new Exception("Failed to delete SCP Resource with id " + request.getResourceId()));
+
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Failed to delete SCP Resource with id " + request.getResourceId())
+ .asRuntimeException());
}
} catch (Exception e) {
logger.error("Failed in deleting the scp resource {}", request.getResourceId(), e);
- responseObserver.onError(new Exception("Failed in deleting the scp resource", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the scp resource")
+ .asRuntimeException());
}
}
@@ -165,11 +204,16 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onNext(resource);
responseObserver.onCompleted();
}, () -> {
- responseObserver.onError(new Exception("No Local Resource with id " + request.getResourceId()));
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No Local Resource with id " + request.getResourceId())
+ .asRuntimeException());
});
} catch (Exception e) {
logger.error("Failed in retrieving resource with id {}", request.getResourceId(), e);
- responseObserver.onError(new Exception("Failed in retrieving resource with id " + request.getResourceId()));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving resource with id " + request.getResourceId())
+ .asRuntimeException());
}
}
@@ -180,7 +224,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed in creating the local resource", e);
- responseObserver.onError(new Exception("Failed in creating the local resource", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the local resource")
+ .asRuntimeException());
}
}
@@ -191,7 +238,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed in updating the local resource {}", request.getResourceId(), e);
- responseObserver.onError(new Exception("Failed in updating the local resource", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the local resource with id " + request.getResourceId())
+ .asRuntimeException());
}
}
@@ -206,7 +256,73 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
}
} catch (Exception e) {
logger.error("Failed in deleting the local resource {}", request.getResourceId(), e);
- responseObserver.onError(new Exception("Failed in deleting the local resource", e));
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the local resource with id " + request.getResourceId())
+ .asRuntimeException());
}
}
+
+ @Override
+ public void getS3Resource(S3ResourceGetRequest request, StreamObserver<S3Resource> responseObserver) {
+ try {
+ this.backend.getS3Resource(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No S3 Resource with id " + request.getResourceId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving S3 resource with id {}", request.getResourceId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving S3 resource with id " + request.getResourceId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createS3Resource(S3ResourceCreateRequest request, StreamObserver<S3Resource> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createS3Resource(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the S3 resource", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the S3 resource")
+ .asRuntimeException());
+ } }
+
+ @Override
+ public void updateS3Resource(S3ResourceUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateS3Resource(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the S3 resource {}", request.getResourceId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the S3 resource with id " + request.getResourceId())
+ .asRuntimeException());
+ } }
+
+ @Override
+ public void deleteS3Resource(S3ResourceDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteS3Resource(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete S3 Resource with id " + request.getResourceId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the S3 resource {}", request.getResourceId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the S3 resource with id " + request.getResourceId())
+ .asRuntimeException());
+ } }
}
diff --git a/services/resource-service/server/src/main/resources/distribution/conf/resources.json b/services/resource-service/server/src/main/resources/distribution/conf/resources.json
index b714976..0791f2c 100644
--- a/services/resource-service/server/src/main/resources/distribution/conf/resources.json
+++ b/services/resource-service/server/src/main/resources/distribution/conf/resources.json
@@ -25,5 +25,12 @@
"type": "LOCAL",
"resourceId": "10mb-file",
"resourcePath": "/tmp/1mb.txt"
+ },
+ {
+ "type": "S3",
+ "resourceId": "s3-file",
+ "resourcePath": "new-file.txt",
+ "region": "us-east-2",
+ "bucketName": "s3-bucket"
}
]
\ No newline at end of file
diff --git a/services/resource-service/server/src/main/resources/resources.json b/services/resource-service/server/src/main/resources/resources.json
index b714976..0791f2c 100644
--- a/services/resource-service/server/src/main/resources/resources.json
+++ b/services/resource-service/server/src/main/resources/resources.json
@@ -25,5 +25,12 @@
"type": "LOCAL",
"resourceId": "10mb-file",
"resourcePath": "/tmp/1mb.txt"
+ },
+ {
+ "type": "S3",
+ "resourceId": "s3-file",
+ "resourcePath": "new-file.txt",
+ "region": "us-east-2",
+ "bucketName": "s3-bucket"
}
]
\ No newline at end of file
diff --git a/services/resource-service/stub/src/main/proto/ResourceService.proto b/services/resource-service/stub/src/main/proto/ResourceService.proto
index 1b4cbe9..42af6b9 100644
--- a/services/resource-service/stub/src/main/proto/ResourceService.proto
+++ b/services/resource-service/stub/src/main/proto/ResourceService.proto
@@ -87,6 +87,36 @@ message LocalResourceDeleteRequest {
string resourceId = 1;
}
+// S3 Resource
+
+message S3Resource {
+ string resourceId = 1;
+ string bucketName = 2;
+ string region = 3;
+ string resourcePath = 4;
+}
+
+message S3ResourceGetRequest {
+ string resourceId = 1;
+}
+
+message S3ResourceCreateRequest {
+ string bucketName = 1;
+ string region = 2;
+ string resourcePath = 3;
+}
+
+message S3ResourceUpdateRequest {
+ string resourceId = 1;
+ string bucketName = 2;
+ string region = 3;
+ string resourcePath = 4;
+}
+
+message S3ResourceDeleteRequest {
+ string resourceId = 1;
+}
+
service ResourceService {
// SCP Storage
@@ -166,4 +196,29 @@ service ResourceService {
};
}
+ // S3 Resource
+
+ rpc getS3Resource (S3ResourceGetRequest) returns (S3Resource) {
+ option (google.api.http) = {
+ get: "/v1.0/resource/s3"
+ };
+ }
+
+ rpc createS3Resource (S3ResourceCreateRequest) returns (S3Resource) {
+ option (google.api.http) = {
+ post: "/v1.0/resource/s3"
+ };
+ }
+
+ rpc updateS3Resource (S3ResourceUpdateRequest) returns (google.protobuf.Empty) {
+ option (google.api.http) = {
+ put: "/v1.0/resource/s3"
+ };
+ }
+
+ rpc deleteS3Resource (S3ResourceDeleteRequest) returns (google.protobuf.Empty) {
+ option (google.api.http) = {
+ delete: "/v1.0/resource/s3"
+ };
+ }
}
\ No newline at end of file
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/SecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/SecretBackend.java
index 999215d..3172387 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/SecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/SecretBackend.java
@@ -25,8 +25,14 @@ public interface SecretBackend {
public void init();
public void destroy();
+
public Optional<SCPSecret> getSCPSecret(SCPSecretGetRequest request) throws Exception;
public SCPSecret createSCPSecret(SCPSecretCreateRequest request);
public boolean updateSCPSecret(SCPSecretUpdateRequest request);
public boolean deleteSCPSecret(SCPSecretDeleteRequest request);
+
+ public Optional<S3Secret> getS3Secret(S3SecretGetRequest request) throws Exception;
+ public S3Secret createS3Secret(S3SecretCreateRequest request) throws Exception;
+ public boolean updateS3Secret(S3SecretUpdateRequest request) throws Exception;
+ public boolean deleteS3Secret(S3SecretDeleteRequest request) throws Exception;
}
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/airavata/AiravataSecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/airavata/AiravataSecretBackend.java
index 2fce6ca..a224c0a 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/airavata/AiravataSecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/airavata/AiravataSecretBackend.java
@@ -66,16 +66,37 @@ public class AiravataSecretBackend implements SecretBackend {
@Override
public SCPSecret createSCPSecret(SCPSecretCreateRequest request) {
- return null;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
@Override
public boolean updateSCPSecret(SCPSecretUpdateRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
@Override
public boolean deleteSCPSecret(SCPSecretDeleteRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+
+ @Override
+ public Optional<S3Secret> getS3Secret(S3SecretGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public S3Secret createS3Secret(S3SecretCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateS3Secret(S3SecretUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteS3Secret(S3SecretDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
}
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/file/FileBasedSecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/file/FileBasedSecretBackend.java
index 7b63582..d2d97ad 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/file/FileBasedSecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/file/FileBasedSecretBackend.java
@@ -58,7 +58,6 @@ public class FileBasedSecretBackend implements SecretBackend {
JSONArray resourceList = (JSONArray) obj;
- System.out.println("All resources ");
List<SCPSecret> scpSecrets = (List<SCPSecret>) resourceList.stream()
.filter(resource -> "SCP".equals(((JSONObject) resource).get("type").toString()))
.map(resource -> {
@@ -78,16 +77,57 @@ public class FileBasedSecretBackend implements SecretBackend {
@Override
public SCPSecret createSCPSecret(SCPSecretCreateRequest request) {
- return null;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
@Override
public boolean updateSCPSecret(SCPSecretUpdateRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
@Override
public boolean deleteSCPSecret(SCPSecretDeleteRequest request) {
- return false;
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public Optional<S3Secret> getS3Secret(S3SecretGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedSecretBackend.class.getClassLoader().getResourceAsStream(secretFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray resourceList = (JSONArray) obj;
+
+ List<S3Secret> scpSecrets = (List<S3Secret>) resourceList.stream()
+ .filter(resource -> "S3".equals(((JSONObject) resource).get("type").toString()))
+ .map(resource -> {
+ JSONObject r = (JSONObject) resource;
+
+ S3Secret s3Secret = S3Secret.newBuilder()
+ .setSecretId(r.get("secretId").toString())
+ .setAccessKey(r.get("accessKey").toString())
+ .setSecretKey(r.get("secretKey").toString()).build();
+
+ return s3Secret;
+ }).collect(Collectors.toList());
+ return scpSecrets.stream().filter(r -> request.getSecretId().equals(r.getSecretId())).findFirst();
+ }
+ }
+
+ @Override
+ public S3Secret createS3Secret(S3SecretCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateS3Secret(S3SecretUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteS3Secret(S3SecretDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
}
}
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/sql/SQLSecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/sql/SQLSecretBackend.java
index 1eab678..79da582 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/sql/SQLSecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/sql/SQLSecretBackend.java
@@ -70,4 +70,24 @@ public class SQLSecretBackend implements SecretBackend {
secretRepository.deleteById(request.getSecretId());
return true;
}
+
+ @Override
+ public Optional<S3Secret> getS3Secret(S3SecretGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public S3Secret createS3Secret(S3SecretCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateS3Secret(S3SecretUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteS3Secret(S3SecretDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
}
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/handler/SecretServiceHandler.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/handler/SecretServiceHandler.java
index d88192a..72a9706 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/handler/SecretServiceHandler.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/handler/SecretServiceHandler.java
@@ -18,15 +18,22 @@
package org.apache.airavata.mft.secret.server.handler;
import com.google.protobuf.Empty;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.mft.secret.server.backend.SecretBackend;
import org.apache.airavata.mft.secret.service.*;
import org.lognet.springboot.grpc.GRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.Optional;
+
@GRpcService
public class SecretServiceHandler extends SecretServiceGrpc.SecretServiceImplBase {
+ private static final Logger logger = LoggerFactory.getLogger(SecretServiceHandler.class);
+
@Autowired
private SecretBackend backend;
@@ -37,11 +44,16 @@ public class SecretServiceHandler extends SecretServiceGrpc.SecretServiceImplBas
responseObserver.onNext(secret);
responseObserver.onCompleted();
}, () -> {
- responseObserver.onError(new Exception("No SCP Secret with id " + request.getSecretId()));
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No SCP Secret with id " + request.getSecretId())
+ .asRuntimeException());
});
} catch (Exception e) {
- e.printStackTrace();
- responseObserver.onError(new Exception("Error in retrieving Secret with id " + request.getSecretId()));
+
+ logger.error("Error in retrieving SCP Secret with id " + request.getSecretId(), e);
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Error in retrieving SCP Secret with id " + request.getSecretId())
+ .asRuntimeException());
}
}
@@ -63,7 +75,66 @@ public class SecretServiceHandler extends SecretServiceGrpc.SecretServiceImplBas
if (res) {
responseObserver.onCompleted();
} else {
- responseObserver.onError(new Exception("Failed to delete SCP Secret with id " + request.getSecretId()));
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("Failed to delete SCP Secret with id " + request.getSecretId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void getS3Secret(S3SecretGetRequest request, StreamObserver<S3Secret> responseObserver) {
+ try {
+ this.backend.getS3Secret(request).ifPresentOrElse(secret -> {
+ responseObserver.onNext(secret);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No S3 Secret with id " + request.getSecretId())
+ .asRuntimeException());
+ });
+
+ } catch (Exception e) {
+ logger.error("Error in retrieving S3 Secret with id " + request.getSecretId(), e);
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Error in retrieving S3 Secret with id " + request.getSecretId())
+ .asRuntimeException());
+ }
+ super.getS3Secret(request, responseObserver);
+ }
+
+ @Override
+ public void createS3Secret(S3SecretCreateRequest request, StreamObserver<S3Secret> responseObserver) {
+ try {
+ this.backend.createS3Secret(request);
+ } catch (Exception e) {
+ logger.error("Error in creating S3 Secret", e);
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Error in creating S3 Secret")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateS3Secret(S3SecretUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateS3Secret(request);
+ } catch (Exception e) {
+ logger.error("Error in updating S3 Secret with id {}", request.getSecretId(), e);
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Error in updating S3 Secret with id " + request.getSecretId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteS3Secret(S3SecretDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.deleteS3Secret(request);
+ } catch (Exception e) {
+ logger.error("Error in deleting S3 Secret with id {}", request.getSecretId(), e);
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Error in deleting S3 Secret with id " + request.getSecretId())
+ .asRuntimeException());
}
}
}
diff --git a/services/secret-service/server/src/main/resources/distribution/conf/secrets.json b/services/secret-service/server/src/main/resources/distribution/conf/secrets.json
index 7fea31f..fd5af0e 100644
--- a/services/secret-service/server/src/main/resources/distribution/conf/secrets.json
+++ b/services/secret-service/server/src/main/resources/distribution/conf/secrets.json
@@ -6,5 +6,11 @@
"privateKey": "",
"publicKey": "",
"passphrase": ""
+ },
+ {
+ "type": "S3",
+ "secretId": "s3-cred",
+ "accessKey": "",
+ "secretKey": ""
}
]
\ No newline at end of file
diff --git a/services/secret-service/server/src/main/resources/secrets.json b/services/secret-service/server/src/main/resources/secrets.json
index 7fea31f..fd5af0e 100644
--- a/services/secret-service/server/src/main/resources/secrets.json
+++ b/services/secret-service/server/src/main/resources/secrets.json
@@ -6,5 +6,11 @@
"privateKey": "",
"publicKey": "",
"passphrase": ""
+ },
+ {
+ "type": "S3",
+ "secretId": "s3-cred",
+ "accessKey": "",
+ "secretKey": ""
}
]
\ No newline at end of file
diff --git a/services/secret-service/stub/src/main/proto/SecretService.proto b/services/secret-service/stub/src/main/proto/SecretService.proto
index 7b57b44..93dfe3e 100644
--- a/services/secret-service/stub/src/main/proto/SecretService.proto
+++ b/services/secret-service/stub/src/main/proto/SecretService.proto
@@ -42,6 +42,36 @@ message SCPSecretDeleteRequest {
AuthToken authzToken = 2;
}
+message S3Secret {
+ string secretId = 1;
+ string accessKey = 2;
+ string secretKey = 3;
+}
+
+message S3SecretGetRequest {
+ string secretId = 1;
+ AuthToken authzToken = 2;
+}
+
+message S3SecretCreateRequest {
+ string accessKey = 1;
+ string secretKey = 2;
+ AuthToken authzToken = 3;
+}
+
+message S3SecretUpdateRequest {
+ string secretId = 1;
+ string accessKey = 2;
+ string secretKey = 3;
+ AuthToken authzToken = 4;
+}
+
+message S3SecretDeleteRequest {
+ string secretId = 1;
+ AuthToken authzToken = 2;
+}
+
+
service SecretService {
rpc getSCPSecret (SCPSecretGetRequest) returns (SCPSecret) {
option (google.api.http) = {
@@ -66,4 +96,28 @@ service SecretService {
delete: "/v1.0/secret/scp"
};
}
+
+ rpc getS3Secret (S3SecretGetRequest) returns (S3Secret) {
+ option (google.api.http) = {
+ get: "/v1.0/secret/s3"
+ };
+ }
+
+ rpc createS3Secret (S3SecretCreateRequest) returns (S3Secret) {
+ option (google.api.http) = {
+ post: "/v1.0/secret/s3"
+ };
+ }
+
+ rpc updateS3Secret (S3SecretUpdateRequest) returns (google.protobuf.Empty) {
+ option (google.api.http) = {
+ put: "/v1.0/secret/s3"
+ };
+ }
+
+ rpc deleteS3Secret (S3SecretDeleteRequest) returns (google.protobuf.Empty) {
+ option (google.api.http) = {
+ delete: "/v1.0/secret/s3"
+ };
+ }
}
\ No newline at end of file
diff --git a/transport/pom.xml b/transport/pom.xml
index 27e3cba..e1373da 100755
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>scp-transport</module>
<module>local-transport</module>
+ <module>s3-transport</module>
</modules>
<dependencies>
<dependency>
diff --git a/transport/pom.xml b/transport/s3-transport/pom.xml
old mode 100755
new mode 100644
similarity index 76%
copy from transport/pom.xml
copy to transport/s3-transport/pom.xml
index 27e3cba..9142055
--- a/transport/pom.xml
+++ b/transport/s3-transport/pom.xml
@@ -20,34 +20,31 @@
under the License.
-->
+
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>airavata-mft</artifactId>
+ <artifactId>mft-transport</artifactId>
<groupId>org.apache.airavata</groupId>
<version>0.01-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>mft-transport</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>scp-transport</module>
- <module>local-transport</module>
- </modules>
+ <artifactId>mft-s3-transport</artifactId>
+
<dependencies>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>mft-resource-service-client</artifactId>
+ <artifactId>mft-core</artifactId>
<version>0.01-SNAPSHOT</version>
</dependency>
+
<dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>mft-secret-service-client</artifactId>
- <version>0.01-SNAPSHOT</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.11.759</version>
</dependency>
</dependencies>
-
</project>
\ No newline at end of file
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
new file mode 100644
index 0000000..33529e0
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.resource.service.S3Resource;
+import org.apache.airavata.mft.resource.service.S3ResourceGetRequest;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.S3Secret;
+import org.apache.airavata.mft.secret.service.S3SecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+
+public class S3MetadataCollector implements MetadataCollector {
+
+ private String resourceServiceHost;
+ private int resourceServicePort;
+ private String secretServiceHost;
+ private int secretServicePort;
+ boolean initialized = false;
+
+ @Override
+ public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) {
+ this.resourceServiceHost = resourceServiceHost;
+ this.resourceServicePort = resourceServicePort;
+ this.secretServiceHost = secretServiceHost;
+ this.secretServicePort = secretServicePort;
+ this.initialized = true;
+ }
+
+ private void checkInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("S3 Metadata Collector is not initialized");
+ }
+ }
+
+ @Override
+ public ResourceMetadata getGetResourceMetadata(String resourceId, String credentialToken) throws Exception {
+
+ checkInitialized();
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ S3Resource s3Resource = resourceClient.getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ S3Secret s3Secret = secretClient.getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Resource.getRegion())
+ .build();
+
+ ResourceMetadata metadata = new ResourceMetadata();
+ ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getResourcePath());
+ metadata.setResourceSize(s3Metadata.getContentLength());
+ metadata.setMd5sum(s3Metadata.getContentMD5());
+ metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
+ metadata.setCreatedTime(s3Metadata.getLastModified().getTime());
+ return metadata;
+ }
+
+ @Override
+ public Boolean isAvailable(String resourceId, String credentialToken) throws Exception {
+
+ checkInitialized();
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ S3Resource s3Resource = resourceClient.getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ S3Secret s3Secret = secretClient.getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Resource.getRegion())
+ .build();
+
+ return s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getResourcePath());
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
new file mode 100644
index 0000000..7f06802
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.resource.service.S3Resource;
+import org.apache.airavata.mft.resource.service.S3ResourceGetRequest;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.S3Secret;
+import org.apache.airavata.mft.secret.service.S3SecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+public class S3Receiver implements Connector {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
+
+ private S3Resource s3Resource;
+ private AmazonS3 s3Client;
+
+ @Override
+ public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ String secretServiceHost, int secretServicePort) throws Exception {
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ this.s3Resource = resourceClient.getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ S3Secret s3Secret = secretClient.getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Resource.getRegion())
+ .build();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void startStream(ConnectorContext context) throws Exception {
+
+ logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
+
+ S3Object s3object = s3Client.getObject(s3Resource.getBucketName(), s3Resource.getResourcePath());
+ S3ObjectInputStream inputStream = s3object.getObjectContent();
+
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ //OutputStream os = new FileOutputStream("/tmp/out.txt");
+ int read;
+ long bytes = 0;
+ while ((read = inputStream.read()) != -1) {
+ bytes++;
+ os.write(read);
+ }
+ os.flush();
+ os.close();
+
+ logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
new file mode 100644
index 0000000..3734022
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.resource.service.S3Resource;
+import org.apache.airavata.mft.resource.service.S3ResourceGetRequest;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.S3Secret;
+import org.apache.airavata.mft.secret.service.S3SecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3Sender implements Connector {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
+
+ private AmazonS3 s3Client;
+ private S3Resource s3Resource;
+
+ @Override
+ public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ this.s3Resource = resourceClient.getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ S3Secret s3Secret = secretClient.getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Resource.getRegion())
+ .build();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void startStream(ConnectorContext context) throws Exception {
+
+ logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
+ logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(context.getMetadata().getResourceSize());
+ s3Client.putObject(this.s3Resource.getBucketName(), this.s3Resource.getResourcePath(), context.getStreamBuffer().getInputStream(), metadata);
+
+ logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SSHResourceIdentifier.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SSHResourceIdentifier.java
deleted file mode 100755
index 31fbac7..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SSHResourceIdentifier.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.scp;
-
-public class SSHResourceIdentifier {
-
- private String id;
- private String remotePath;
- private String host;
- private String user;
- private int port;
- private String keyFile;
- private String keyPassphrase;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getRemotePath() {
- return remotePath;
- }
-
- public void setRemotePath(String remotePath) {
- this.remotePath = remotePath;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public String getKeyFile() {
- return keyFile;
- }
-
- public void setKeyFile(String keyFile) {
- this.keyFile = keyFile;
- }
-
- public String getKeyPassphrase() {
- return keyPassphrase;
- }
-
- public void setKeyPassphrase(String keyPassphrase) {
- this.keyPassphrase = keyPassphrase;
- }
-}