You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/10/13 01:40:02 UTC

[airavata-data-lake] branch master updated: Optimizations to speedup resource registration + scanning internal directories

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new 8227ff7  Optimizations to speedup resource registration + scanning internal directories
8227ff7 is described below

commit 8227ff7c735be46fb182af688ea6bc753df9a0af
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Oct 12 21:39:51 2021 -0400

    Optimizations to speedup resource registration + scanning internal directories
---
 .../handlers/async/OrchestratorEventProcessor.java | 120 ++++++++++++---------
 1 file changed, 72 insertions(+), 48 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index a62b8ea..adfdc53 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -62,8 +62,9 @@ public class OrchestratorEventProcessor implements Runnable {
         this.notificationClient = notificationClient;
     }
 
-    private List<GenericResource> createResourceRecursively(String hostName, String storageId, String basePath,
-                                                            String resourcePath, String resourceType, String user)
+    private List<GenericResource> createResourceWithParentDirectories(String hostName, String storageId, String basePath,
+                                                                      String resourcePath, String resourceType, String user,
+                                                                      Map<String, GenericResource> resourceCache)
             throws Exception {
 
         List<GenericResource> resourceList = new ArrayList<>();
@@ -77,6 +78,12 @@ public class OrchestratorEventProcessor implements Runnable {
         for (int i = 0; i < splitted.length - 1; i++) {
             String resourceName = splitted[i];
             currentPath = currentPath + "/" + resourceName;
+
+            if (resourceCache.containsKey(currentPath)) {
+                resourceList.add(resourceCache.get(currentPath));
+                continue;
+            }
+
             String resourceId = Utils.getId(storageId + ":" + currentPath);
             Optional<GenericResource> optionalGenericResource =
                     this.drmsConnector.createResource(notification.getAuthToken(),
@@ -92,6 +99,7 @@ public class OrchestratorEventProcessor implements Runnable {
                 this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
                         notification.getTenantId(), parentId, user, parentType, metadata);
 
+                resourceCache.put(currentPath, optionalGenericResource.get());
                 resourceList.add(optionalGenericResource.get());
             } else {
                 logger.error("Could not create a resource for path {}", currentPath);
@@ -150,6 +158,7 @@ public class OrchestratorEventProcessor implements Runnable {
         logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
                 notification.getBasePath());
 
+        Map<String, GenericResource> resourceCache = new HashMap<>();
         try {
 
             this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
@@ -196,10 +205,10 @@ public class OrchestratorEventProcessor implements Runnable {
 
             // Creating parent resource
 
-            List<GenericResource> resourceList = createResourceRecursively(sourceHostName, sourceStorageId,
+            List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId,
                     notification.getBasePath(),
                     notification.getResourcePath(),
-                    "COLLECTION", adminUser);
+                    "COLLECTION", adminUser, resourceCache);
 
             shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
                     adminUser, owner, "VIEWER");
@@ -245,54 +254,15 @@ public class OrchestratorEventProcessor implements Runnable {
                     .putProperties("TENANT_ID", notification.getTenantId()).build();
 
             AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
-
-            FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
-                    .setMftAuthorizationToken(mftAuth)
-                    .setResourceId(resourceObj.getResourceId());
-
-            switch (sourceSP.getStorageCase()) {
-                case SSH_STORAGE_PREFERENCE:
-                    resourceMetadataReq.setResourceType("SCP");
-                    resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
-                    break;
-                case S3_STORAGE_PREFERENCE:
-                    resourceMetadataReq.setResourceType("S3");
-                    resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
-                    break;
-            }
-
-            // Fetching file list for parent resource
-
-            DirectoryMetadataResponse directoryResourceMetadata;
-
-            try (MFTApiClient mftApiClient = new MFTApiClient(
-                    this.configuration.getOutboundEventProcessor().getMftHost(),
-                    this.configuration.getOutboundEventProcessor().getMftPort())) {
-                MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
-                directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
-            }
-
             List<String> resourceIDsToProcess = new ArrayList<>();
-            for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
-                logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
-                resourceList = createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
-                        fileMetadata.getResourcePath(), "FILE", adminUser);
-                GenericResource fileResource = resourceList.get(resourceList.size() - 1);
 
-                resourceIDsToProcess.add(fileResource.getResourceId());
-            }
-
-            for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
-                logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
-                createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
-                        directoryMetadata.getResourcePath(),
-                        "COLLECTION", adminUser);
-                // TODO scan directories
-            }
+            // Fetching file list for parent resource
+            scanResourceForChildResources(resourceObj, mftAuth, sourceSP, sourceStorageId, sourceHostName,
+                    adminUser, resourceIDsToProcess, resourceCache, 4);
 
             logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
-            resourceList = createResourceRecursively(destinationHostName, destinationStorageId, notification.getBasePath(),
-                    notification.getResourcePath(), "FILE", adminUser);
+            resourceList = createResourceWithParentDirectories(destinationHostName, destinationStorageId, notification.getBasePath(),
+                    notification.getResourcePath(), "FILE", adminUser, resourceCache);
 
             GenericResource destinationResource = resourceList.get(resourceList.size() - 1);
 
@@ -328,4 +298,58 @@ public class OrchestratorEventProcessor implements Runnable {
             this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
         }
     }
+
+    private void scanResourceForChildResources(GenericResource resourceObj, AuthToken mftAuth, AnyStoragePreference sourceSP,
+                                               String sourceStorageId, String sourceHostName, String adminUser,
+                                               List<String> resourceIDsToProcess, Map<String, GenericResource> resourceCache,
+                                               int scanDepth)
+            throws Exception {
+
+        FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
+                .setMftAuthorizationToken(mftAuth)
+                .setResourceId(resourceObj.getResourceId());
+
+        switch (sourceSP.getStorageCase()) {
+            case SSH_STORAGE_PREFERENCE:
+                resourceMetadataReq.setResourceType("SCP");
+                resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
+                break;
+            case S3_STORAGE_PREFERENCE:
+                resourceMetadataReq.setResourceType("S3");
+                resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
+                break;
+        }
+
+        DirectoryMetadataResponse directoryResourceMetadata;
+
+        try (MFTApiClient mftApiClient = new MFTApiClient(
+                this.configuration.getOutboundEventProcessor().getMftHost(),
+                this.configuration.getOutboundEventProcessor().getMftPort())) {
+            MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
+            directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
+        }
+
+        for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
+            logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
+            List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
+                    fileMetadata.getResourcePath(), "FILE", adminUser, resourceCache);
+            GenericResource fileResource = resourceList.get(resourceList.size() - 1);
+
+            resourceIDsToProcess.add(fileResource.getResourceId());
+        }
+
+        for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
+            logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
+            List<GenericResource> createResources = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
+                    directoryMetadata.getResourcePath(),
+                    "COLLECTION", adminUser, resourceCache);
+            GenericResource dirResource = createResources.get(createResources.size() - 1);
+
+            if (scanDepth > 0) {
+                // Scanning the directories recursively
+                scanResourceForChildResources(dirResource, mftAuth, sourceSP, sourceStorageId, sourceHostName, adminUser,
+                        resourceIDsToProcess, resourceCache, scanDepth - 1);
+            }
+        }
+    }
 }