You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/09 15:42:47 UTC

[airavata-data-lake] branch master updated: Adding host name and resource path for all resources as metadata

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new e665320  Adding host name and resource path for all resources as metadata
e665320 is described below

commit e66532090690b0f0155619a8113c49f8eb5c28fe
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Sep 9 11:42:06 2021 -0400

    Adding host name and resource path for all resources as metadata
---
 .../orchestrator/connectors/DRMSConnector.java     | 30 ++++++++++++++++++++++
 .../handlers/async/OrchestratorEventProcessor.java | 29 ++++++++++++++++-----
 2 files changed, 53 insertions(+), 6 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index 350aee8..d8ee739 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -1,5 +1,7 @@
 package org.apache.airavata.datalake.orchestrator.connectors;
 
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import org.apache.airavata.datalake.drms.AuthCredentialType;
@@ -16,6 +18,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -174,6 +177,33 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
         }
     }
 
+    public void addResourceMetadata(String authToken,
+                                    String tenantId,
+                                    String resourceId,
+                                    String user,
+                                    Map<String, String> metadata) {
+
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(authToken)
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(user)
+                        .setTenantId(tenantId)
+                        .build())
+                .build();
+
+        Struct.Builder structBuilder = Struct.newBuilder();
+        metadata.forEach((key, value) -> structBuilder.putFields(key,
+                Value.newBuilder().setStringValue(value).build()));
+
+        resourceServiceBlockingStub.addResourceMetadata(AddResourceMetadataRequest.newBuilder()
+                .setResourceId(resourceId)
+                .setAuthToken(serviceAuthToken)
+                .setType("FILE")
+                .setMetadata(structBuilder.build()).build());
+    }
+
+
     public Optional<AnyStoragePreference> getStoragePreference(String authToken, String username, String tenantId, String storageId) {
         DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
                 .setAccessToken(authToken)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 1d20f59..c7a6e1b 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -62,7 +62,7 @@ public class OrchestratorEventProcessor implements Runnable {
         this.notificationClient = notificationClient;
     }
 
-    private List<GenericResource> createResourceRecursively(String storageId, String basePath,
+    private List<GenericResource> createResourceRecursively(String hostName, String storageId, String basePath,
                                                             String resourcePath, String resourceType, String user)
             throws Exception {
 
@@ -84,6 +84,13 @@ public class OrchestratorEventProcessor implements Runnable {
                             resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
             if (optionalGenericResource.isPresent()) {
                 parentId = optionalGenericResource.get().getResourceId();
+
+                Map<String, String> metadata = new HashMap<>();
+                metadata.put("resourcePath", currentPath);
+                metadata.put("hostName", hostName);
+                this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
+                        notification.getTenantId(), parentId, user, metadata);
+
                 parentType = "COLLECTION";
                 resourceList.add(optionalGenericResource.get());
             } else {
@@ -102,7 +109,15 @@ public class OrchestratorEventProcessor implements Runnable {
                         parentId, resourceType, parentType, user);
 
         if (optionalGenericResource.isPresent()) {
-            resourceList.add(optionalGenericResource.get());
+            GenericResource genericResource = optionalGenericResource.get();
+
+            Map<String, String> metadata = new HashMap<>();
+            metadata.put("resourcePath", currentPath);
+            metadata.put("hostName", hostName);
+            this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
+                    notification.getTenantId(), genericResource.getResourceId(), user, metadata);
+
+            resourceList.add(genericResource);
         } else {
             logger.error("Could not create a resource for path {}", currentPath);
             throw new Exception("Could not create a resource for path " + currentPath);
@@ -175,11 +190,13 @@ public class OrchestratorEventProcessor implements Runnable {
             TransferMapping transferMapping = optionalTransferMapping.get();
 
             String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
+            String sourceHostName = transferMapping.getSourceStorage().getSshStorage().getHostName();
             String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+            String destinationHostName = transferMapping.getDestinationStorage().getSshStorage().getHostName();
 
             // Creating parent resource
 
-            List<GenericResource> resourceList = createResourceRecursively(sourceStorageId,
+            List<GenericResource> resourceList = createResourceRecursively(sourceHostName, sourceStorageId,
                     notification.getBasePath(),
                     notification.getResourcePath(),
                     "COLLECTION", adminUser);
@@ -258,7 +275,7 @@ public class OrchestratorEventProcessor implements Runnable {
             List<String> resourceIDsToProcess = new ArrayList<>();
             for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
                 logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
-                resourceList = createResourceRecursively(sourceStorageId, notification.getBasePath(),
+                resourceList = createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
                         fileMetadata.getResourcePath(), "FILE", adminUser);
                 GenericResource fileResource = resourceList.get(resourceList.size() - 1);
 
@@ -267,14 +284,14 @@ public class OrchestratorEventProcessor implements Runnable {
 
             for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
                 logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
-                createResourceRecursively(sourceStorageId, notification.getBasePath(),
+                createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
                         directoryMetadata.getResourcePath(),
                         "COLLECTION", adminUser);
                 // TODO scan directories
             }
 
             logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
-            resourceList = createResourceRecursively(destinationStorageId, notification.getBasePath(),
+            resourceList = createResourceRecursively(destinationHostName, destinationStorageId, notification.getBasePath(),
                     notification.getResourcePath(), "FILE", adminUser);
 
             GenericResource destinationResource = resourceList.get(resourceList.size() - 1);