You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/07/08 09:53:32 UTC
[airavata-mft] 02/02: Implementing datalake resource backend
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 29fd382ae36c928d4b52cf6f5d1244c1b0d44130
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jul 8 05:53:19 2021 -0400
Implementing datalake resource backend
---
.../src/main/proto/CredCommon.proto | 6 +
services/resource-service/server/pom.xml | 5 +
.../backend/datalake/DatalakeResourceBackend.java | 331 +++++++++++++++++++++
.../src/main/resources/application.properties | 7 +-
.../src/main/resources/applicationContext.xml | 2 +-
services/resource-service/stub/pom.xml | 11 +
.../airavata/mft/secret/server/AppConfig.java | 24 --
.../server/backend/custos/CustosSecretBackend.java | 19 +-
.../custos/auth/AgentAuthenticationHandler.java | 1 -
.../src/main/resources/applicationContext.xml | 2 +-
.../distribution/conf/applicationContext.xml | 2 +-
11 files changed, 375 insertions(+), 35 deletions(-)
diff --git a/common/mft-common-proto/src/main/proto/CredCommon.proto b/common/mft-common-proto/src/main/proto/CredCommon.proto
index bd7d9b7..f60487d 100644
--- a/common/mft-common-proto/src/main/proto/CredCommon.proto
+++ b/common/mft-common-proto/src/main/proto/CredCommon.proto
@@ -24,6 +24,11 @@ message UserTokenAuth {
string token = 1;
}
+message PasswordAuth {
+ string userName = 1;
+ string password = 2;
+}
+
message AgentAuth {
string token = 1;
string agentId = 2;
@@ -42,5 +47,6 @@ message AuthToken {
UserTokenAuth userTokenAuth = 1;
AgentAuth agentAuth = 2;
DelegateAuth delegateAuth = 3;
+ PasswordAuth passwordAuth = 4;
}
}
\ No newline at end of file
diff --git a/services/resource-service/server/pom.xml b/services/resource-service/server/pom.xml
index 3239b55..3b8712e 100644
--- a/services/resource-service/server/pom.xml
+++ b/services/resource-service/server/pom.xml
@@ -64,6 +64,11 @@
<artifactId>grpc-services</artifactId>
<version>1.25.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>drms-stubs</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
new file mode 100644
index 0000000..d66b08d
--- /dev/null
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.resource.server.backend.datalake;
+
+import com.google.protobuf.Struct;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.storage.ResourceFetchRequest;
+import org.apache.airavata.datalake.drms.storage.ResourceFetchResponse;
+import org.apache.airavata.datalake.drms.storage.ResourceServiceGrpc;
+import org.apache.airavata.datalake.drms.storage.preference.s3.S3StoragePreference;
+import org.apache.airavata.datalake.drms.storage.preference.ssh.SSHStoragePreference;
+import org.apache.airavata.datalake.drms.storage.ssh.SSHStorage;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.PasswordAuth;
+import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
+import org.apache.airavata.mft.resource.stubs.common.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
+import org.apache.airavata.mft.resource.stubs.scp.storage.*;
+import org.apache.custos.clients.CustosClientProvider;
+import org.apache.custos.identity.management.client.IdentityManagementClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+public class DatalakeResourceBackend implements ResourceBackend {
+
+ private static final Logger logger = LoggerFactory.getLogger(DatalakeResourceBackend.class);
+
+ private ManagedChannel channel;
+
+ @org.springframework.beans.factory.annotation.Value("${datalake.backend.custos.client.id}")
+ private String clientId;
+
+ @org.springframework.beans.factory.annotation.Value("${datalake.backend.custos.client.secret}")
+ private String clientSecret;
+
+ @Override
+ public void init() {
+ channel = ManagedChannelBuilder.forAddress("localhost", 7070).usePlaintext().build();
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ channel.shutdown();
+ } catch (Exception e) {
+ logger.error("Failed to gracefully terminate DRMS API channel");
+ }
+ }
+
+ @Override
+ public Optional<GenericResource> getGenericResource(GenericResourceGetRequest request) throws Exception {
+
+ AuthToken authzToken = request.getAuthzToken();
+ PasswordAuth passwordAuth = authzToken.getPasswordAuth();
+ String accessToken = getAccessToken(clientId, clientSecret, passwordAuth.getUserName(), passwordAuth.getPassword());
+
+ if (accessToken == null) {
+ logger.error("Failed to fetch an access token when fetching a resource with id {} to user {}",
+ request.getResourceId(), passwordAuth.getUserName());
+ throw new Exception("\"Failed to fetch an access token when fetching a resource with id " + request.getResourceId() +
+ " to user " + passwordAuth.getUserName());
+ }
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub datalakeResourceStub = ResourceServiceGrpc.newBlockingStub(channel);
+ ResourceFetchResponse resourceFetchResponse = datalakeResourceStub.fetchResource(ResourceFetchRequest.newBuilder()
+ .setAuthToken(DRMSServiceAuthToken.newBuilder().setAccessToken(accessToken).build())
+ .setResourceId(request.getResourceId())
+ .build());
+
+ if (resourceFetchResponse.getResource().getResourceId().isEmpty()) {
+ return Optional.empty();
+ }
+
+ org.apache.airavata.datalake.drms.resource.GenericResource resource = resourceFetchResponse.getResource();
+ GenericResource.Builder resourceBuilder = GenericResource.newBuilder()
+ .setResourceId(resource.getResourceId());
+
+ switch (resource.getType()) {
+ case "FILE":
+ resourceBuilder.setFile(FileResource.newBuilder()
+ .setResourcePath(resource.getResourcePath()).build());
+ break;
+ case "COLLECTION":
+ resourceBuilder.setDirectory(DirectoryResource.newBuilder()
+ .setResourcePath(resource.getResourcePath()).build());
+ break;
+ }
+
+ switch (resource.getStoragePreferenceCase()) {
+ case SSH_PREFERENCE:
+ SSHStoragePreference sshPreference = resource.getSshPreference();
+ SSHStorage storage = sshPreference.getStorage();
+ resourceBuilder.setScpStorage(SCPStorage.newBuilder()
+ .setStorageId(storage.getStorageId()).setHost(storage.getHostName())
+ .setPort(storage.getPort())
+ .setUser(sshPreference.getUserName()).build());
+ break;
+ case S3_PREFERENCE:
+ S3StoragePreference s3Preference = resource.getS3Preference();
+ org.apache.airavata.datalake.drms.storage.s3.S3Storage s3Storage = s3Preference.getStorage();
+ resourceBuilder.setS3Storage(S3Storage.newBuilder()
+ .setStorageId(s3Storage.getStorageId())
+ .setBucketName(s3Storage.getBucketName())
+ .setRegion(s3Storage.getRegion()).build());
+ break;
+ case STORAGEPREFERENCE_NOT_SET:
+ logger.error("No preference registered for the resource {}", request.getResourceId());
+ throw new Exception("No preference registered for the resource " + request.getResourceId());
+ }
+
+ return Optional.of(resourceBuilder.build());
+ }
+
+ @Override
+ public GenericResource createGenericResource(GenericResourceCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateGenericResource(GenericResourceUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteGenericResource(GenericResourceDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<SCPStorage> getSCPStorage(SCPStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public SCPStorage createSCPStorage(SCPStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateSCPStorage(SCPStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteSCPStorage(SCPStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public Optional<FTPStorage> getFTPStorage(FTPStorageGetRequest request) throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public FTPStorage createFTPStorage(FTPStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateFTPStorage(FTPStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteFTPStorage(FTPStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+
+ private String getAccessToken(String clientId, String clientSecret, String userName, String password) {
+ try {
+
+ CustosClientProvider custosClientProvider = new CustosClientProvider.Builder().setServerHost("custos.scigap.org")
+ .setServerPort(31499)
+ .setClientId(clientId)
+ .setClientSec(clientSecret).build();
+
+ IdentityManagementClient identityManagementClient = custosClientProvider.getIdentityManagementClient();
+ Struct struct = identityManagementClient.getToken(null, null, userName, password, null, "password");
+ return struct.getFieldsMap().get("access_token").getStringValue();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/services/resource-service/server/src/main/resources/application.properties b/services/resource-service/server/src/main/resources/application.properties
index 3dbfa86..e69ad43 100644
--- a/services/resource-service/server/src/main/resources/application.properties
+++ b/services/resource-service/server/src/main/resources/application.properties
@@ -15,7 +15,7 @@
# limitations under the License.
#
-server.port=8080
+server.port=8089
grpc.port=7002
grpc.enableReflection=true
@@ -25,4 +25,7 @@ airavata.backend.registry.server.port=8970
# Configurations for File Backend
file.backend.resource.file=resources.json
-file.backend.storage.file=storages.json
\ No newline at end of file
+file.backend.storage.file=storages.json
+
+datalake.backend.custos.client.id=custos-whedmgamitu357p4wuke-10002708
+datalake.backend.custos.client.secret=mrMdl86Ia1H94cikW7CvHoh7L0ASNXQVt2aRzSIj
\ No newline at end of file
diff --git a/services/resource-service/server/src/main/resources/applicationContext.xml b/services/resource-service/server/src/main/resources/applicationContext.xml
index ea845a3..2fbd7eb 100644
--- a/services/resource-service/server/src/main/resources/applicationContext.xml
+++ b/services/resource-service/server/src/main/resources/applicationContext.xml
@@ -6,7 +6,7 @@
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
- <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.file.FileBasedResourceBackend"
+ <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.datalake.DatalakeResourceBackend"
init-method="init" destroy-method="destroy"></bean>
</beans>
\ No newline at end of file
diff --git a/services/resource-service/stub/pom.xml b/services/resource-service/stub/pom.xml
index c1e00d3..b11afd5 100644
--- a/services/resource-service/stub/pom.xml
+++ b/services/resource-service/stub/pom.xml
@@ -32,6 +32,14 @@
<artifactId>mft-resource-service-stub</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-common-proto</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
<build>
<extensions>
<extension>
@@ -49,6 +57,9 @@
<protocArtifact>com.google.protobuf:protoc:3.0.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.1:exe:${os.detected.classifier}</pluginArtifact>
+ <additionalProtoPathElements>
+ <additionalProtoPathElement>../../../common/mft-common-proto/src/main/proto</additionalProtoPathElement>
+ </additionalProtoPathElements>
</configuration>
<executions>
<execution>
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/AppConfig.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/AppConfig.java
index 8af548f..45b40e3 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/AppConfig.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/AppConfig.java
@@ -29,28 +29,4 @@ import java.io.IOException;
@Configuration
public class AppConfig {
-
- @Value("${custos.host}")
- private String custosHost;
-
- @Value("${custos.port}")
- private int custosPort;
-
- @Value("${custos.id}")
- private String custosId;
-
- @Value("${custos.secret}")
- private String custosSecret;
-
- @Bean
- public CustosClientsFactory custosClientsFactory() {
- return new CustosClientsFactory(custosHost, custosPort, custosId, custosSecret);
- }
-
- @Bean
- public AgentAuthenticationHandler agentAuthenticationHandler() throws IOException {
- return new AgentAuthenticationHandler(custosId, custosSecret, custosClientsFactory());
- }
-
-
}
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
index 33bd15f..601f2d8 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
@@ -33,11 +33,20 @@ import java.util.Optional;
public class CustosSecretBackend implements SecretBackend {
private static final Logger LOGGER = LoggerFactory.getLogger(CustosSecretBackend.class);
+ @Value("${custos.host}")
+ private String custosHost;
+
+ @Value("${custos.port}")
+ private int custosPort;
+
+ @Value("${custos.id}")
+ private String custosId;
+
+ @Value("${custos.secret}")
+ private String custosSecret;
- @Autowired
private AgentAuthenticationHandler handler;
- @Autowired
private CustosClientsFactory custosClientsFactory;
private IdentityManagementClient identityClient;
@@ -46,13 +55,13 @@ public class CustosSecretBackend implements SecretBackend {
private ResourceSecretManagementAgentClient csAgentClient;
- @Value("${custos.id}")
- private String custosId;
-
@Override
public void init() {
try {
+
+ custosClientsFactory = new CustosClientsFactory(custosHost, custosPort, custosId, custosSecret);
+ handler = new AgentAuthenticationHandler(custosId, custosSecret, custosClientsFactory);
Optional<CustosClientProvider> custosClientProvider = custosClientsFactory.getCustosClientProvider(custosId);
if (custosClientProvider.isPresent()) {
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/auth/AgentAuthenticationHandler.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/auth/AgentAuthenticationHandler.java
index 0f2c8c1..f27be5c 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/auth/AgentAuthenticationHandler.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/auth/AgentAuthenticationHandler.java
@@ -31,7 +31,6 @@ public class AgentAuthenticationHandler implements AuthenticationHandler, Closea
private IdentityManagementClient identityManagementClient;
- @Autowired
private CustosClientsFactory custosClientsFactory;
diff --git a/services/secret-service/server/src/main/resources/applicationContext.xml b/services/secret-service/server/src/main/resources/applicationContext.xml
index c815ce7..9bec46c 100644
--- a/services/secret-service/server/src/main/resources/applicationContext.xml
+++ b/services/secret-service/server/src/main/resources/applicationContext.xml
@@ -8,7 +8,7 @@
<!-- <bean id="resourceBackend" class="org.apache.airavata.mft.secret.server.backend.file.FileBasedSecretBackend"-->
<!-- init-method="init" destroy-method="destroy"></bean>-->
- <bean id="resourceBackend" class="org.apache.airavata.mft.secret.server.backend.custos.CustosSecretBackend"
+ <bean id="resourceBackend" class="org.apache.airavata.mft.secret.server.backend.file.FileBasedSecretBackend"
init-method="init" destroy-method="destroy"></bean>
</beans>
\ No newline at end of file
diff --git a/services/secret-service/server/src/main/resources/distribution/conf/applicationContext.xml b/services/secret-service/server/src/main/resources/distribution/conf/applicationContext.xml
index c815ce7..9bec46c 100644
--- a/services/secret-service/server/src/main/resources/distribution/conf/applicationContext.xml
+++ b/services/secret-service/server/src/main/resources/distribution/conf/applicationContext.xml
@@ -8,7 +8,7 @@
<!-- <bean id="resourceBackend" class="org.apache.airavata.mft.secret.server.backend.file.FileBasedSecretBackend"-->
<!-- init-method="init" destroy-method="destroy"></bean>-->
- <bean id="resourceBackend" class="org.apache.airavata.mft.secret.server.backend.custos.CustosSecretBackend"
+ <bean id="resourceBackend" class="org.apache.airavata.mft.secret.server.backend.file.FileBasedSecretBackend"
init-method="init" destroy-method="destroy"></bean>
</beans>
\ No newline at end of file