You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/11 16:18:50 UTC

[airavata-data-lake] branch master updated: Bug fix drms

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new caa27c4  Bug fix drms
     new 16b56bb  Merge pull request #24 from isururanawaka/workflow_invocation
caa27c4 is described below

commit caa27c47f8ec8cba905f09926ac6f113fe0669e5
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Sun Jul 11 12:17:49 2021 -0400

    Bug fix drms
---
 .../orchestrator/connectors/DRMSConnector.java     | 121 ++++++++++-----------
 .../processor/OutboundEventProcessor.java          |  28 ++++-
 .../drms-rest-proxy/src/main/resources/drms.pb     | Bin 109344 -> 109314 bytes
 .../src/main/proto/storage/StorageService.proto    |   6 +-
 4 files changed, 83 insertions(+), 72 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index f87542e..4f9cbab 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -28,7 +28,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
 
     private ManagedChannel drmsChannel;
     private ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub;
-    private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub storagePreferenceServiceBlockingStub;
+    private StorageServiceGrpc.StorageServiceBlockingStub storageServiceBlockingStub;
 
     public DRMSConnector(Configuration configuration) throws Exception {
         this.init(configuration);
@@ -40,7 +40,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
                 .forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
                         configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
         this.resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(drmsChannel);
-        this.storagePreferenceServiceBlockingStub = StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
+        this.storageServiceBlockingStub = StorageServiceGrpc.newBlockingStub(drmsChannel);
 
     }
 
@@ -54,67 +54,62 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
         return !this.drmsChannel.isShutdown();
     }
 
-//    public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-//        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-//                .setAccessToken(entity.getAuthToken())
-//                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-//                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-//                        .setUsername(entity.getOwnerId())
-//                        .setTenantId(entity.getTenantId())
-//                        .build())
-//                .build();
-//        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
-//                .setAuthToken(serviceAuthToken)
-//                .build();
-//        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
-//        List<TransferMapping> transferMappingList = response.getMappingsList();
-//        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
-//        if (!transferMappingList.isEmpty()) {
-//            transferMappingList.forEach(transferMapping -> {
-//                if (transferMapping.getSourceStoragePreference().getStorageCase()
-//                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-//                    if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
-//                            .getStorage().getHostName().equals(hostname)) {
-//                        storagePreferenceId
-//                                .set(transferMapping.getSourceStoragePreference()
-//                                        .getSshStoragePreference().getStoragePreferenceId());
-//                    }
-//                }
-//            });
-//        }
-//        return Optional.ofNullable(storagePreferenceId.get());
-//    }
-//
-//    public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-//        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-//                .setAccessToken(entity.getAuthToken())
-//                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-//                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-//                        .setUsername(entity.getOwnerId())
-//                        .setTenantId(entity.getTenantId())
-//                        .build())
-//                .build();
-//        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
-//                .setAuthToken(serviceAuthToken)
-//                .build();
-//        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
-//        List<TransferMapping> transferMappingList = response.getMappingsList();
-//        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
-//        if (!transferMappingList.isEmpty()) {
-//            transferMappingList.forEach(transferMapping -> {
-//                if (transferMapping.getDestinationStoragePreference().getStorageCase()
-//                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-//                    if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
-//                            .getStorage().getHostName().equals(hostname)) {
-//                        storagePreferenceId
-//                                .set(transferMapping.getDestinationStoragePreference()
-//                                        .getSshStoragePreference().getStoragePreferenceId());
-//                    }
-//                }
-//            });
-//        }
-//        return Optional.ofNullable(storagePreferenceId.get());
-//    }
+    public Optional<TransferMapping> getActiveTransferMapping(DataOrchestratorEntity entity, String hostname) {
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(entity.getAuthToken())
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(entity.getOwnerId())
+                        .setTenantId(entity.getTenantId())
+                        .build())
+                .build();
+        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .build();
+        FindTransferMappingsResponse response = storageServiceBlockingStub.getTransferMappings(request);
+        List<TransferMapping> transferMappingList = response.getMappingsList();
+        AtomicReference<TransferMapping> transferMappingOp = new AtomicReference<>(null);
+        if (!transferMappingList.isEmpty()) {
+            transferMappingList.forEach(transferMapping -> {
+                if (transferMapping.getSourceStorage().getStorageCase()
+                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+                    if (transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname)) {
+                            transferMappingOp.set(transferMapping);
+                    }
+                }
+            });
+        }
+        return Optional.ofNullable(transferMappingOp.get());
+    }
+
+    public Optional<String> getDestinationStorageId(DataOrchestratorEntity entity, String hostname) {
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(entity.getAuthToken())
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(entity.getOwnerId())
+                        .setTenantId(entity.getTenantId())
+                        .build())
+                .build();
+        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .build();
+        FindTransferMappingsResponse response = storageServiceBlockingStub.getTransferMappings(request);
+        List<TransferMapping> transferMappingList = response.getMappingsList();
+        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+        if (!transferMappingList.isEmpty()) {
+            transferMappingList.forEach(transferMapping -> {
+                if (transferMapping.getDestinationStorage().getStorageCase()
+                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+                    if (transferMapping.getDestinationStorage().getSshStorage().getHostName().equals(hostname)) {
+                        storagePreferenceId
+                                .set(transferMapping.getDestinationStorage().getSshStorage().getStorageId());
+                    }
+                }
+            });
+        }
+        return Optional.ofNullable(storagePreferenceId.get());
+    }
 
 
     public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 013ee59..522d8a7 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.datalake.orchestrator.processor;
 
 import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.TransferMapping;
 import org.apache.airavata.datalake.orchestrator.Configuration;
 import org.apache.airavata.datalake.orchestrator.Utils;
 import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
@@ -89,27 +90,28 @@ public class OutboundEventProcessor implements MessageProcessor<Configuration> {
             String tail = resourcePath.substring(resourcePath.indexOf(ownerId));
             String[] collections = tail.split("/");
 
-//            Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
-            Optional<String> optionalStorPref = null;
+            Optional<TransferMapping> optionalStorPref = drmsConnector.getActiveTransferMapping(entity, entity.getHostName());
             if (optionalStorPref.isEmpty()) {
                 entity.setEventStatus(EventStatus.ERRORED.name());
-                entity.setError("StoragePreference not found for host: " + entity.getHostName());
+                entity.setError("Storage not found for host: " + entity.getHostName());
                 repository.save(entity);
                 return;
             }
 
-            String parentId = optionalStorPref.get();
+            TransferMapping transferMapping = optionalStorPref.get();
+            String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
+            String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
 
+            String parentId = sourceStorageId;
             for (int i = 1; i < collections.length - 1; i++) {
                 String resourceName = collections[i];
                 String path = entity.getResourcePath().substring(0, entity.getResourcePath().indexOf(resourceName));
                 path = path.concat(resourceName);
                 String entityId = Utils.getId(path);
                 Optional<GenericResource> optionalGenericResource =
-                        this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, parentId, "COLLECTION");
+                        this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, sourceStorageId, "COLLECTION");
                 if (optionalGenericResource.isPresent()) {
                     parentId = optionalGenericResource.get().getResourceId();
-
                 } else {
                     entity.setEventStatus(EventStatus.ERRORED.name());
                     entity.setError("Collection structure creation failed: " + entity.getHostName());
@@ -123,6 +125,20 @@ public class OutboundEventProcessor implements MessageProcessor<Configuration> {
                             collections[collections.length - 1], entity.getResourcePath(),
                             parentId, "FILE");
 
+            String dstResourceHost = transferMapping.getDestinationStorage().getSshStorage().getHostName();
+            String destinationResourceId = dstResourceHost+":"+ entity.getResourcePath() + ":" + entity.getResourceType();
+            String messageId  = Utils.getId(destinationResourceId);
+
+            Optional<GenericResource> destinationFile = this.drmsConnector.createResource(repository, entity, messageId,
+                    entity.getResourceName(),
+                    entity.getResourcePath(),
+                    destinationStorageId,
+                    "FILE");
+
+            Optional<GenericResource> optionalGenericResourceDST =
+                    this.drmsConnector.createResource(repository, entity, messageId,
+                            collections[collections.length - 1], entity.getResourcePath(),
+                            destinationStorageId, "FILE");
 
             if (optionalGenericResource.isPresent()) {
                 this.workflowServiceConnector.invokeWorkflow(repository, entity, optionalGenericResource.get());
diff --git a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 0227833..1b745c1 100644
Binary files a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb and b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb differ
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
index 3f5b3a2..bd89026 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
@@ -167,19 +167,19 @@ service StorageService {
 
   rpc createTransferMapping (CreateTransferMappingRequest) returns (CreateTransferMappingResponse) {
     option (google.api.http) = {
-      post: "/v1.0/api/drms/storagePreference/transferMapping"
+      post: "/v1.0/api/drms/storage/transferMapping"
     };
   }
 
   rpc getTransferMappings (FindTransferMappingsRequest) returns (FindTransferMappingsResponse) {
     option (google.api.http) = {
-      get: "/v1.0/api/drms/storagePreference/transferMapping"
+      get: "/v1.0/api/drms/storage/transferMapping"
     };
   }
 
   rpc deleteTransferMappings (DeleteTransferMappingRequest) returns (google.protobuf.Empty) {
     option (google.api.http) = {
-      delete: "/v1.0/api/drms/storagePreference/transferMapping"
+      delete: "/v1.0/api/drms/storage/transferMapping"
     };
   }
 }
\ No newline at end of file