You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/11 00:29:11 UTC
[airavata-data-lake] branch master updated: Move transfer mappings
to storage
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new f1200a1 Move transfer mappings to storage
new 4bb9ee4 Merge pull request #23 from isururanawaka/workflow_invocation
f1200a1 is described below
commit f1200a1a77d1647b75ea28824eeddb1463b0fdcc
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Sat Jul 10 20:28:23 2021 -0400
Move transfer mappings to storage
---
.../orchestrator/connectors/DRMSConnector.java | 122 +++++++-------
.../processor/OutboundEventProcessor.java | 3 +-
.../java/org/apache/airavata/drms/api/Client.java | 32 ++--
.../handlers/StoragePreferenceServiceHandler.java | 169 --------------------
.../drms/api/handlers/StorageServiceHandler.java | 177 +++++++++++++++++++++
.../deserializer/TransferMappingDeserializer.java | 22 +--
.../drms-rest-proxy/src/main/resources/drms.pb | Bin 109388 -> 109344 bytes
.../preference/StoragePreferenceService.proto | 50 ------
.../src/main/proto/storage/StorageService.proto | 57 +++++++
9 files changed, 324 insertions(+), 308 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index d76ffa6..f87542e 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -54,67 +54,67 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
return !this.drmsChannel.isShutdown();
}
- public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
- DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
- .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
- .setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(entity.getOwnerId())
- .setTenantId(entity.getTenantId())
- .build())
- .build();
- FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .build();
- FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
- List<TransferMapping> transferMappingList = response.getMappingsList();
- AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
- if (!transferMappingList.isEmpty()) {
- transferMappingList.forEach(transferMapping -> {
- if (transferMapping.getSourceStoragePreference().getStorageCase()
- .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
- if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
- .getStorage().getHostName().equals(hostname)) {
- storagePreferenceId
- .set(transferMapping.getSourceStoragePreference()
- .getSshStoragePreference().getStoragePreferenceId());
- }
- }
- });
- }
- return Optional.ofNullable(storagePreferenceId.get());
- }
-
- public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
- DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
- .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
- .setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(entity.getOwnerId())
- .setTenantId(entity.getTenantId())
- .build())
- .build();
- FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .build();
- FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
- List<TransferMapping> transferMappingList = response.getMappingsList();
- AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
- if (!transferMappingList.isEmpty()) {
- transferMappingList.forEach(transferMapping -> {
- if (transferMapping.getDestinationStoragePreference().getStorageCase()
- .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
- if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
- .getStorage().getHostName().equals(hostname)) {
- storagePreferenceId
- .set(transferMapping.getDestinationStoragePreference()
- .getSshStoragePreference().getStoragePreferenceId());
- }
- }
- });
- }
- return Optional.ofNullable(storagePreferenceId.get());
- }
+// public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+// DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+// .setAccessToken(entity.getAuthToken())
+// .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+// .setUsername(entity.getOwnerId())
+// .setTenantId(entity.getTenantId())
+// .build())
+// .build();
+// FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+// .setAuthToken(serviceAuthToken)
+// .build();
+// FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+// List<TransferMapping> transferMappingList = response.getMappingsList();
+// AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+// if (!transferMappingList.isEmpty()) {
+// transferMappingList.forEach(transferMapping -> {
+// if (transferMapping.getSourceStoragePreference().getStorageCase()
+// .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+// if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
+// .getStorage().getHostName().equals(hostname)) {
+// storagePreferenceId
+// .set(transferMapping.getSourceStoragePreference()
+// .getSshStoragePreference().getStoragePreferenceId());
+// }
+// }
+// });
+// }
+// return Optional.ofNullable(storagePreferenceId.get());
+// }
+//
+// public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+// DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+// .setAccessToken(entity.getAuthToken())
+// .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+// .setUsername(entity.getOwnerId())
+// .setTenantId(entity.getTenantId())
+// .build())
+// .build();
+// FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+// .setAuthToken(serviceAuthToken)
+// .build();
+// FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+// List<TransferMapping> transferMappingList = response.getMappingsList();
+// AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+// if (!transferMappingList.isEmpty()) {
+// transferMappingList.forEach(transferMapping -> {
+// if (transferMapping.getDestinationStoragePreference().getStorageCase()
+// .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+// if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
+// .getStorage().getHostName().equals(hostname)) {
+// storagePreferenceId
+// .set(transferMapping.getDestinationStoragePreference()
+// .getSshStoragePreference().getStoragePreferenceId());
+// }
+// }
+// });
+// }
+// return Optional.ofNullable(storagePreferenceId.get());
+// }
public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 82e554c..013ee59 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -89,7 +89,8 @@ public class OutboundEventProcessor implements MessageProcessor<Configuration> {
String tail = resourcePath.substring(resourcePath.indexOf(ownerId));
String[] collections = tail.split("/");
- Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+// Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+ Optional<String> optionalStorPref = null;
if (optionalStorPref.isEmpty()) {
entity.setEventStatus(EventStatus.ERRORED.name());
entity.setError("StoragePreference not found for host: " + entity.getHostName());
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
index e0e9428..9611fee 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
@@ -23,7 +23,7 @@ import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.*;
-import org.apache.airavata.datalake.drms.storage.preference.ssh.SSHStoragePreference;
+import org.apache.airavata.datalake.drms.storage.ssh.SSHStorage;
import org.apache.custos.clients.CustosClientProvider;
import org.apache.custos.identity.management.client.IdentityManagementClient;
import org.json.JSONObject;
@@ -55,7 +55,7 @@ public class Client {
// StorageCreateRequest request = StorageCreateRequest.newBuilder().setAuthToken(authToken).
// setStorage(AnyStorage.newBuilder().setSshStorage(SSHStorage.newBuilder()
-// .setStorageId("qwerft-rftgyhu-oplmnj")
+// .setStorageId("testing.com")
// .setHostName("localhost")
// .setPort(3565)
// .build())
@@ -116,31 +116,39 @@ public class Client {
TransferMapping transferMapping = TransferMapping.newBuilder()
.setUserId("isjarana@iu.edu")
.setTransferScope(TransferScope.GLOBAL)
- .setDestinationStoragePreference(AnyStoragePreference.newBuilder()
- .setSshStoragePreference(SSHStoragePreference.newBuilder()
- .setStoragePreferenceId("ssh_storage_preference").build()))
- .setSourceStoragePreference(AnyStoragePreference.newBuilder()
- .setSshStoragePreference(SSHStoragePreference.newBuilder()
- .setStoragePreferenceId("ssh_storage_preference_2").build()))
+ .setSourceStorage(AnyStorage
+ .newBuilder()
+ .setSshStorage(SSHStorage.newBuilder().setStorageId("testing.com")
+ .build())
+ .build())
+ .setDestinationStorage(AnyStorage
+ .newBuilder()
+ .setSshStorage(SSHStorage.newBuilder().setStorageId("qwerft-rftgyhu-oplmnj")
+ .build())
+ .build())
.build();
-
+//
CreateTransferMappingRequest request = CreateTransferMappingRequest.newBuilder()
.setAuthToken(authToken)
.setTransferMapping(transferMapping)
.build();
+// resourceClient.createTransferMapping(request);
+//
FindTransferMappingsRequest findTransferMappingsRequest = FindTransferMappingsRequest.newBuilder()
.setAuthToken(authToken)
.build();
-
+// resourceClient.getTransferMappings(findTransferMappingsRequest);
+//
DeleteTransferMappingRequest transferMappingRequest = DeleteTransferMappingRequest.newBuilder()
.setAuthToken(authToken)
- .setId("ssh_storage_preference_2_ssh_storage_preference")
+ .setId("testing.com_qwerft-rftgyhu-oplmnj")
.build();
+ resourceClient.deleteTransferMappings(transferMappingRequest);
// storagePreferenceServiceBlockingStub.deleteTransferMappings(transferMappingRequest);
- storagePreferenceServiceBlockingStub.createTransferMapping(request);
+// storagePreferenceServiceBlockingStub.createTransferMapping(request);
ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(channel);
// ResourceSearchQuery query = ResourceSearchQuery.newBuilder().setField("type").setValue("COLLECTION").build();
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 7221e6d..57858f9 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -285,177 +285,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
}
}
- @Override
- public void createTransferMapping(CreateTransferMappingRequest request, StreamObserver<CreateTransferMappingResponse> responseObserver) {
- try {
- AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
- AnyStoragePreference sourceStoragePreference = request.getTransferMapping().getSourceStoragePreference();
- AnyStoragePreference destinationStoragePreference = request.getTransferMapping().getDestinationStoragePreference();
- String sourceId = getStorageId(sourceStoragePreference);
- String destinationId = getStorageId(destinationStoragePreference);
-
- TransferScope scope = request.getTransferMapping().getTransferScope();
- Map<String, Object> properties = new HashMap<>();
- Map<String, Object> props = new HashMap<>();
- props.put("tenantId", authenticatedUser.getTenantId());
- props.put("owner", authenticatedUser.getUsername());
- props.put("srcStoragePreferenceId", sourceId);
- props.put("dstStoragePreferenceId", destinationId);
- String entityId = sourceId + "_" + destinationId;
- if (scope.equals(TransferScope.GLOBAL)) {
- props.put("scope", TransferScope.GLOBAL.name());
- } else {
- props.put("scope", TransferScope.USER.name());
- }
- properties.put("props", props);
- properties.put("tenantId", authenticatedUser.getTenantId());
- properties.put("entityId", entityId);
- properties.put("username", authenticatedUser.getUsername());
- properties.put("srcStoragePreferenceId", sourceId);
- properties.put("dstStoragePreferenceId", destinationId);
- properties.put("owner", authenticatedUser.getUsername());
-
-
- if (hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), sourceId) &&
- hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), destinationId)) {
- String query = " Match (u:User), (srcSp:StoragePreference), (dstSp:StoragePreference) where " +
- " u.username=$username AND u.tenantId=$tenantId AND " +
- "srcSp.storagePreferenceId=$srcStoragePreferenceId AND " +
- "srcSp.tenantId = $tenantId AND dstSp.storagePreferenceId=$dstStoragePreferenceId " +
- "AND dstSp.tenantId =$tenantId " +
- " Merge (u)-[:HAS_TRANSFER_MAPPING]->(tm:TransferMapping{entityId:$entityId, tenantId:$tenantId, " +
- "srcStoragePreferenceId:$srcStoragePreferenceId," +
- "dstStoragePreferenceId:$dstStoragePreferenceId,owner:$owner}) set tm += $props" +
- " Merge (tm)<-[:TRANSFER_OUT]-(srcSp)" +
- " Merge (tm)-[:TRANSFER_IN]->(dstSp) return (tm)";
- this.neo4JConnector.runTransactionalQuery(properties, query);
-
- String searchQuery = " Match (srcStr:Storage)<-[:CHILD_OF]-" +
- "(srcSp:StoragePreference)-[:TRANSFER_OUT]->(tm:TransferMapping)" +
- "-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage) where " +
- " tm.entityId=$entityId AND tm.tenantId=$tenantId return srcStr, srcSp, dstStr, dstSp, tm";
- List<Record> records = this.neo4JConnector.searchNodes(properties, searchQuery);
- if (!records.isEmpty()) {
- List<TransferMapping> transferMappings = TransferMappingDeserializer.deserializeList(records);
- if (!transferMappings.isEmpty()) {
- CreateTransferMappingResponse response = CreateTransferMappingResponse
- .newBuilder()
- .setTransferMapping(transferMappings.get(0))
- .build();
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- } else {
- String msg = "Errored while creating transfer mapping; Message:";
- logger.error("Errored while creating transfer mapping; Message:");
- responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- } else {
- String msg = "Errored while creating transfer mapping; Message:";
- logger.error("Errored while creating transfer mapping; Message:");
- responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- } else {
- String msg = "User does not have permission to create mapping ";
- logger.error("User does not have permission to create mapping ");
- responseObserver.onError(Status.PERMISSION_DENIED.withDescription(msg).asRuntimeException());
-
- }
- } catch (Exception e) {
- String msg = "Errored while creating transfer mapping; Message:" + e.getMessage();
- logger.error("Errored while creating transfer mapping; Message: {}", e.getMessage(), e);
- responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- }
-
- @Override
- public void getTransferMappings(FindTransferMappingsRequest request, StreamObserver<FindTransferMappingsResponse> responseObserver) {
- try {
- List<TransferMapping> transferMappings = new ArrayList<>();
- AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
- Map<String, Object> properties = new HashMap<>();
- properties.put("username", authenticatedUser.getUsername());
- properties.put("tenantId", authenticatedUser.getTenantId());
- properties.put("scope", TransferScope.USER.name());
- String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->(t:TransferMapping{scope:$scope}) where u.username = $username AND u.tenantId = $tenantId" +
- " Match (srcStr:Storage)<-[:CHILD_OF]-(srcSp:StoragePreference)-[:TRANSFER_OUT]->(t)-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)" +
- " return srcStr, srcSp, dstStr, dstSp, t";
- List<Record> records = this.neo4JConnector.searchNodes(properties, query);
- properties.put("scope", TransferScope.GLOBAL.name());
- String queryFetchGlobal = "Match (srcStr:Storage)<-[:CHILD_OF]-" +
- "(srcSp:StoragePreference)-[:TRANSFER_OUT]->(t:TransferMapping{scope:$scope, tenantId:$tenantId})-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)" +
- " return srcStr, srcSp, dstStr, dstSp, t";
- List<Record> globalRecords = this.neo4JConnector.searchNodes(properties, queryFetchGlobal);
- if (!records.isEmpty()) {
- transferMappings = TransferMappingDeserializer.deserializeList(records);
- }
- if (!globalRecords.isEmpty()) {
- transferMappings.addAll(TransferMappingDeserializer.deserializeList(globalRecords));
- }
- FindTransferMappingsResponse findTransferMappingsResponse = FindTransferMappingsResponse
- .newBuilder()
- .addAllMappings(transferMappings)
- .build();
- responseObserver.onNext(findTransferMappingsResponse);
- responseObserver.onCompleted();
-
- } catch (Exception ex) {
- String msg = "Errored while fetching transfer mappings, Message:" + ex.getMessage();
- logger.error("Errored while fetching transfer mappings, Message: {}", ex.getMessage(), ex);
- responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- }
-
- @Override
- public void deleteTransferMappings(DeleteTransferMappingRequest request, StreamObserver<Empty> responseObserver) {
- try {
- AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
- String transferMappingId = request.getId();
- Map<String, Object> properties = new HashMap<>();
- properties.put("username", authenticatedUser.getUsername());
- properties.put("tenantId", authenticatedUser.getTenantId());
- properties.put("entityId", transferMappingId);
- String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->" +
- "(t:TransferMapping{entityId:$entityId})" +
- " where u.username = $username AND u.tenantId = $tenantId detach delete t";
- this.neo4JConnector.runTransactionalQuery(properties, query);
- responseObserver.onNext(Empty.newBuilder().build());
- responseObserver.onCompleted();
- } catch (Exception ex) {
- String msg = "Errored while delete transfer mappings, Message:" + ex.getMessage();
- logger.error("Errored while delete transfer mappings, Message: {}", ex.getMessage(), ex);
- responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- }
- private boolean hasAccess(String username, String tenantId, String storagePrefId) throws Exception {
- Map<String, Object> userProps = new HashMap<>();
- userProps.put("username", username);
- userProps.put("tenantId", tenantId);
- userProps.put("entityId", storagePrefId);
- List<Record> records = this.neo4JConnector.searchNodes(userProps,
- " MATCH (u:User) where u.username = $username AND u.tenantId = $tenantId" +
- " OPTIONAL MATCH (u)<-[:SHARED_WITH]-(s1:Storage)<-[:CHILD_OF]->(sp1:StoragePreference{entityId:$entityId})" +
- " OPTIONAL MATCH (cg:Group)-[:CHILD_OF*]->(g:Group)<-[:MEMBER_OF]-(u)" +
- " OPTIONAL MATCH (sp2:StoragePreference{entityId:$entityId})-[:CHILD_OF]->(s2:Storage)-[:SHARED_WITH]->(cg) " +
- " OPTIONAL MATCH (sp3:StoragePreference {entityId:$entityId})-[:CHILD_OF]->(s3:Storage)-[:SHARED_WITH]->(g)" +
- " OPTIONAL MATCH (s4:Storage)<-[:CHILD_OF]->(sp4:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(u)" +
- " OPTIONAL MATCH (s5:Storage)<-[:CHILD_OF]->(sp5:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(cg)" +
- " OPTIONAL MATCH (s6:Storage)<-[:CHILD_OF]->(sp6:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(g)" +
- " return distinct s1, sp1, s2, sp2, s3, sp3, s4,sp4, s5,sp5, s6,sp6");
- if (!records.isEmpty()) {
- return true;
- }
- return false;
- }
- private String getStorageId(AnyStoragePreference storage) {
- if (storage.getStorageCase()
- .equals(AnyStoragePreference.StorageCase.S3_STORAGE_PREFERENCE)) {
- return storage.getS3StoragePreference().getStoragePreferenceId();
- } else {
- return storage.getSshStoragePreference().getStoragePreferenceId();
- }
- }
}
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
index 14abb21..a9dc2d0 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
@@ -17,6 +17,7 @@
package org.apache.airavata.drms.api.handlers;
import com.google.protobuf.Empty;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.datalake.drms.AuthenticatedUser;
import org.apache.airavata.datalake.drms.storage.*;
@@ -24,6 +25,7 @@ import org.apache.airavata.drms.api.utils.CustosUtils;
import org.apache.airavata.drms.core.Neo4JConnector;
import org.apache.airavata.drms.core.constants.StorageConstants;
import org.apache.airavata.drms.core.deserializer.AnyStorageDeserializer;
+import org.apache.airavata.drms.core.deserializer.TransferMappingDeserializer;
import org.apache.airavata.drms.core.serializer.AnyStorageSerializer;
import org.apache.custos.clients.CustosClientProvider;
import org.lognet.springboot.grpc.GRpcService;
@@ -32,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -187,4 +190,178 @@ public class StorageServiceHandler extends StorageServiceGrpc.StorageServiceImpl
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
}
}
+
+
+ @Override
+ public void createTransferMapping(CreateTransferMappingRequest request, StreamObserver<CreateTransferMappingResponse> responseObserver) {
+ try {
+ AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
+ AnyStorage sourceStoragePreference = request.getTransferMapping().getSourceStorage();
+ AnyStorage destinationStoragePreference = request.getTransferMapping().getDestinationStorage();
+ String sourceId = getStorageId(sourceStoragePreference);
+ String destinationId = getStorageId(destinationStoragePreference);
+
+ TransferScope scope = request.getTransferMapping().getTransferScope();
+ Map<String, Object> properties = new HashMap<>();
+ Map<String, Object> props = new HashMap<>();
+ props.put("tenantId", authenticatedUser.getTenantId());
+ props.put("owner", authenticatedUser.getUsername());
+ props.put("srcStorageId", sourceId);
+ props.put("dstStorageId", destinationId);
+ String entityId = sourceId + "_" + destinationId;
+ if (scope.equals(TransferScope.GLOBAL)) {
+ props.put("scope", TransferScope.GLOBAL.name());
+ } else {
+ props.put("scope", TransferScope.USER.name());
+ }
+ properties.put("props", props);
+ properties.put("tenantId", authenticatedUser.getTenantId());
+ properties.put("entityId", entityId);
+ properties.put("username", authenticatedUser.getUsername());
+ properties.put("srcStorageId", sourceId);
+ properties.put("dstStorageId", destinationId);
+ properties.put("owner", authenticatedUser.getUsername());
+
+
+ if (hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), sourceId) &&
+ hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), destinationId)) {
+ String query = " Match (u:User), (srcSp:Storage), (dstSp:Storage) where " +
+ " u.username=$username AND u.tenantId=$tenantId AND " +
+ "srcSp.storageId=$srcStorageId AND " +
+ "srcSp.tenantId = $tenantId AND dstSp.storageId=$dstStorageId " +
+ "AND dstSp.tenantId =$tenantId " +
+ " Merge (u)-[:HAS_TRANSFER_MAPPING]->(tm:TransferMapping{entityId:$entityId, tenantId:$tenantId, " +
+ "srcStorageId:$srcStorageId," +
+ "dstStorageId:$dstStorageId,owner:$owner}) set tm += $props" +
+ " Merge (tm)<-[:TRANSFER_OUT]-(srcSp)" +
+ " Merge (tm)-[:TRANSFER_IN]->(dstSp) return (tm)";
+ this.neo4JConnector.runTransactionalQuery(properties, query);
+
+ String searchQuery = " Match (srcStr:Storage)-[:TRANSFER_OUT]->(tm:TransferMapping)" +
+ "-[:TRANSFER_IN]->(dstStr:Storage) where " +
+ " tm.entityId=$entityId AND tm.tenantId=$tenantId return srcStr, dstStr, tm";
+ List<Record> records = this.neo4JConnector.searchNodes(properties, searchQuery);
+ if (!records.isEmpty()) {
+ List<TransferMapping> transferMappings = TransferMappingDeserializer.deserializeList(records);
+ if (!transferMappings.isEmpty()) {
+ CreateTransferMappingResponse response = CreateTransferMappingResponse
+ .newBuilder()
+ .setTransferMapping(transferMappings.get(0))
+ .build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ } else {
+ String msg = "Errored while creating transfer mapping; Message:";
+ logger.error("Errored while creating transfer mapping; Message:");
+ responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ } else {
+ String msg = "Errored while creating transfer mapping; Message:";
+ logger.error("Errored while creating transfer mapping; Message:");
+ responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ } else {
+ String msg = "User does not have permission to create mapping ";
+ logger.error("User does not have permission to create mapping ");
+ responseObserver.onError(Status.PERMISSION_DENIED.withDescription(msg).asRuntimeException());
+
+ }
+ } catch (Exception e) {
+ String msg = "Errored while creating transfer mapping; Message:" + e.getMessage();
+ logger.error("Errored while creating transfer mapping; Message: {}", e.getMessage(), e);
+ responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+ @Override
+ public void getTransferMappings(FindTransferMappingsRequest request, StreamObserver<FindTransferMappingsResponse> responseObserver) {
+ try {
+ List<TransferMapping> transferMappings = new ArrayList<>();
+ AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("username", authenticatedUser.getUsername());
+ properties.put("tenantId", authenticatedUser.getTenantId());
+ properties.put("scope", TransferScope.USER.name());
+ String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->(t:TransferMapping{scope:$scope}) where u.username = $username AND u.tenantId = $tenantId" +
+ " Match (srcStr:Storage)-[:TRANSFER_OUT]->(t)-[:TRANSFER_IN]->(dstStr:Storage)" +
+ " return srcStr, dstStr, t";
+ List<Record> records = this.neo4JConnector.searchNodes(properties, query);
+ properties.put("scope", TransferScope.GLOBAL.name());
+ String queryFetchGlobal = "Match (srcStr:Storage)<-[:TRANSFER_OUT]->(t:TransferMapping{scope:$scope, tenantId:$tenantId})-[:TRANSFER_IN]->(dstStr:Storage)" +
+ " return srcStr, dstStr, t";
+ List<Record> globalRecords = this.neo4JConnector.searchNodes(properties, queryFetchGlobal);
+ if (!records.isEmpty()) {
+ transferMappings = TransferMappingDeserializer.deserializeList(records);
+ }
+ if (!globalRecords.isEmpty()) {
+ transferMappings.addAll(TransferMappingDeserializer.deserializeList(globalRecords));
+ }
+ FindTransferMappingsResponse findTransferMappingsResponse = FindTransferMappingsResponse
+ .newBuilder()
+ .addAllMappings(transferMappings)
+ .build();
+ responseObserver.onNext(findTransferMappingsResponse);
+ responseObserver.onCompleted();
+
+ } catch (Exception ex) {
+ String msg = "Errored while fetching transfer mappings, Message:" + ex.getMessage();
+ logger.error("Errored while fetching transfer mappings, Message: {}", ex.getMessage(), ex);
+ responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteTransferMappings(DeleteTransferMappingRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
+ String transferMappingId = request.getId();
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("username", authenticatedUser.getUsername());
+ properties.put("tenantId", authenticatedUser.getTenantId());
+ properties.put("entityId", transferMappingId);
+ String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->" +
+ "(t:TransferMapping{entityId:$entityId})" +
+ " where u.username = $username AND u.tenantId = $tenantId detach delete t";
+ this.neo4JConnector.runTransactionalQuery(properties, query);
+ responseObserver.onNext(Empty.newBuilder().build());
+ responseObserver.onCompleted();
+
+ } catch (Exception ex) {
+ String msg = "Errored while delete transfer mappings, Message:" + ex.getMessage();
+ logger.error("Errored while delete transfer mappings, Message: {}", ex.getMessage(), ex);
+ responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+
+ private String getStorageId(AnyStorage storage) {
+ if (storage.getStorageCase()
+ .equals(AnyStorage.StorageCase.S3_STORAGE)) {
+ return storage.getS3Storage().getStorageId();
+ } else {
+ return storage.getSshStorage().getStorageId();
+ }
+ }
+
+ private boolean hasAccess(String username, String tenantId, String storageId) throws Exception {
+ Map<String, Object> userProps = new HashMap<>();
+ userProps.put("username", username);
+ userProps.put("tenantId", tenantId);
+ userProps.put("entityId", storageId);
+
+ List<Record> records = this.neo4JConnector.searchNodes(userProps,
+ " MATCH (u:User) where u.username = $username AND u.tenantId = $tenantId" +
+ " OPTIONAL MATCH (u)<-[:SHARED_WITH]-(s1:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp1:StoragePreference)" +
+ " OPTIONAL MATCH (cg:Group)-[:CHILD_OF*]->(g:Group)<-[:MEMBER_OF]-(u)" +
+ " OPTIONAL MATCH (sp2:StoragePreference)-[:CHILD_OF]->(s2:Storage{entityId:$entityId})-[:SHARED_WITH]->(cg) " +
+ " OPTIONAL MATCH (sp3:StoragePreference )-[:CHILD_OF]->(s3:Storage{entityId:$entityId})-[:SHARED_WITH]->(g)" +
+ " OPTIONAL MATCH (s4:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp4:StoragePreference)-[:SHARED_WITH]->(u)" +
+ " OPTIONAL MATCH (s5:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp5:StoragePreference)-[:SHARED_WITH]->(cg)" +
+ " OPTIONAL MATCH (s6:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp6:StoragePreference)-[:SHARED_WITH]->(g)" +
+ " return distinct s1, sp1, s2, sp2, s3, sp3, s4,sp4, s5,sp5, s6,sp6");
+ if (!records.isEmpty()) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
index 461bbe8..51e9a57 100644
--- a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
@@ -1,7 +1,6 @@
package org.apache.airavata.drms.core.deserializer;
import org.apache.airavata.datalake.drms.storage.AnyStorage;
-import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.drms.storage.TransferScope;
import org.neo4j.driver.Record;
@@ -20,29 +19,22 @@ public class TransferMappingDeserializer {
InternalRecord internalRecord = (InternalRecord) record;
List<Value> values = internalRecord.values();
- if (values.size() == 5) {
+ if (values.size() == 3) {
Value srcStr = values.get(0);
- Value srcSpr = values.get(1);
- Value dstStr = values.get(2);
- Value dstSp = values.get(3);
- Value tm = values.get(4);
+ Value dstStr = values.get(1);
+ Value tm = values.get(2);
- if (!srcStr.isNull() && !srcSpr.isNull() && !tm.isNull()
- && !dstStr.isNull() && !dstSp.isNull()) {
+ if (!srcStr.isNull() && !tm.isNull()
+ && !dstStr.isNull()) {
AnyStorage storage = AnyStorageDeserializer.deriveStorageFromMap(srcStr.asMap());
- AnyStoragePreference srcPreference = AnyStoragePreferenceDeserializer
- .deriveStoragePrefFromMap(srcSpr.asMap(), storage);
-
AnyStorage dstStorage = AnyStorageDeserializer.deriveStorageFromMap(dstStr.asMap());
- AnyStoragePreference dstPreference = AnyStoragePreferenceDeserializer
- .deriveStoragePrefFromMap(dstSp.asMap(), dstStorage);
Map<String, Object> map = tm.asMap();
TransferMapping transferMapping = TransferMapping.newBuilder()
.setTransferScope(TransferScope.valueOf(map.get("scope").toString()))
.setId(map.get("entityId").toString())
- .setSourceStoragePreference(srcPreference)
- .setDestinationStoragePreference(dstPreference)
+ .setSourceStorage(storage)
+ .setDestinationStorage(dstStorage)
.setUserId(map.get("owner").toString())
.build();
diff --git a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 21a39a1..0227833 100644
Binary files a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb and b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb differ
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
index 6c2c6ef..0b0bad2 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
@@ -81,43 +81,9 @@ message StoragePreferenceSearchResponse {
repeated AnyStoragePreference storages_preference = 2;
}
-enum TransferScope {
- UNKNOWN = 0;
- GLOBAL = 1;
- USER = 2;
-}
-
-message TransferMapping {
- string id = 1;
- string user_id = 2;
- AnyStoragePreference source_storage_preference = 3;
- AnyStoragePreference destination_storage_preference = 4;
- TransferScope transfer_scope = 5;
-}
-
-message CreateTransferMappingRequest {
- org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
- TransferMapping transfer_mapping = 2;
-}
-message CreateTransferMappingResponse {
- TransferMapping transfer_mapping = 1;
-}
-message FindTransferMappingsRequest {
- org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
- string id = 2;
- TransferScope transfer_scope = 3;
-}
-message FindTransferMappingsResponse {
- repeated TransferMapping mappings = 1;
-}
-
-message DeleteTransferMappingRequest {
- org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
- string id = 2;
-}
service StoragePreferenceService {
@@ -152,21 +118,5 @@ service StoragePreferenceService {
};
}
- rpc createTransferMapping (CreateTransferMappingRequest) returns (CreateTransferMappingResponse) {
- option (google.api.http) = {
- post: "/v1.0/api/drms/storagePreference/transferMapping"
- };
- }
- rpc getTransferMappings (FindTransferMappingsRequest) returns (FindTransferMappingsResponse) {
- option (google.api.http) = {
- get: "/v1.0/api/drms/storagePreference/transferMapping"
- };
- }
-
- rpc deleteTransferMappings (DeleteTransferMappingRequest) returns (google.protobuf.Empty) {
- option (google.api.http) = {
- delete: "/v1.0/api/drms/storagePreference/transferMapping"
- };
- }
}
\ No newline at end of file
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
index abab38a..3f5b3a2 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
@@ -88,6 +88,45 @@ message AddStorageMetadataRequest {
string value = 4;
}
+enum TransferScope {
+ UNKNOWN = 0;
+ GLOBAL = 1;
+ USER = 2;
+}
+
+message TransferMapping {
+ string id = 1;
+ string user_id = 2;
+ AnyStorage source_storage = 3;
+ AnyStorage destination_storage = 4;
+ TransferScope transfer_scope = 5;
+}
+
+
+message CreateTransferMappingRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+ TransferMapping transfer_mapping = 2;
+}
+
+message CreateTransferMappingResponse {
+ TransferMapping transfer_mapping = 1;
+}
+
+message FindTransferMappingsRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+ string id = 2;
+ TransferScope transfer_scope = 3;
+}
+
+message FindTransferMappingsResponse {
+ repeated TransferMapping mappings = 1;
+}
+
+message DeleteTransferMappingRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+ string id = 2;
+}
+
service StorageService {
rpc fetchStorage (StorageFetchRequest) returns (StorageFetchResponse) {
@@ -125,4 +164,22 @@ service StorageService {
post: "/v1.0/api/drms/storage/metadata"
};
}
+
+ rpc createTransferMapping (CreateTransferMappingRequest) returns (CreateTransferMappingResponse) {
+ option (google.api.http) = {
+ post: "/v1.0/api/drms/storagePreference/transferMapping"
+ };
+ }
+
+ rpc getTransferMappings (FindTransferMappingsRequest) returns (FindTransferMappingsResponse) {
+ option (google.api.http) = {
+ get: "/v1.0/api/drms/storagePreference/transferMapping"
+ };
+ }
+
+ rpc deleteTransferMappings (DeleteTransferMappingRequest) returns (google.protobuf.Empty) {
+ option (google.api.http) = {
+ delete: "/v1.0/api/drms/storagePreference/transferMapping"
+ };
+ }
}
\ No newline at end of file