You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/10/13 01:40:02 UTC
[airavata-data-lake] branch master updated: Optimizations to
speedup resource registration + scanning internal directories
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new 8227ff7 Optimizations to speedup resource registration + scanning internal directories
8227ff7 is described below
commit 8227ff7c735be46fb182af688ea6bc753df9a0af
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Oct 12 21:39:51 2021 -0400
Optimizations to speedup resource registration + scanning internal directories
---
.../handlers/async/OrchestratorEventProcessor.java | 120 ++++++++++++---------
1 file changed, 72 insertions(+), 48 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index a62b8ea..adfdc53 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -62,8 +62,9 @@ public class OrchestratorEventProcessor implements Runnable {
this.notificationClient = notificationClient;
}
- private List<GenericResource> createResourceRecursively(String hostName, String storageId, String basePath,
- String resourcePath, String resourceType, String user)
+ private List<GenericResource> createResourceWithParentDirectories(String hostName, String storageId, String basePath,
+ String resourcePath, String resourceType, String user,
+ Map<String, GenericResource> resourceCache)
throws Exception {
List<GenericResource> resourceList = new ArrayList<>();
@@ -77,6 +78,12 @@ public class OrchestratorEventProcessor implements Runnable {
for (int i = 0; i < splitted.length - 1; i++) {
String resourceName = splitted[i];
currentPath = currentPath + "/" + resourceName;
+
+ if (resourceCache.containsKey(currentPath)) {
+ resourceList.add(resourceCache.get(currentPath));
+ continue;
+ }
+
String resourceId = Utils.getId(storageId + ":" + currentPath);
Optional<GenericResource> optionalGenericResource =
this.drmsConnector.createResource(notification.getAuthToken(),
@@ -92,6 +99,7 @@ public class OrchestratorEventProcessor implements Runnable {
this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
notification.getTenantId(), parentId, user, parentType, metadata);
+ resourceCache.put(currentPath, optionalGenericResource.get());
resourceList.add(optionalGenericResource.get());
} else {
logger.error("Could not create a resource for path {}", currentPath);
@@ -150,6 +158,7 @@ public class OrchestratorEventProcessor implements Runnable {
logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
notification.getBasePath());
+ Map<String, GenericResource> resourceCache = new HashMap<>();
try {
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
@@ -196,10 +205,10 @@ public class OrchestratorEventProcessor implements Runnable {
// Creating parent resource
- List<GenericResource> resourceList = createResourceRecursively(sourceHostName, sourceStorageId,
+ List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
notification.getResourcePath(),
- "COLLECTION", adminUser);
+ "COLLECTION", adminUser, resourceCache);
shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
adminUser, owner, "VIEWER");
@@ -245,54 +254,15 @@ public class OrchestratorEventProcessor implements Runnable {
.putProperties("TENANT_ID", notification.getTenantId()).build();
AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
-
- FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
- .setMftAuthorizationToken(mftAuth)
- .setResourceId(resourceObj.getResourceId());
-
- switch (sourceSP.getStorageCase()) {
- case SSH_STORAGE_PREFERENCE:
- resourceMetadataReq.setResourceType("SCP");
- resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
- break;
- case S3_STORAGE_PREFERENCE:
- resourceMetadataReq.setResourceType("S3");
- resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
- break;
- }
-
- // Fetching file list for parent resource
-
- DirectoryMetadataResponse directoryResourceMetadata;
-
- try (MFTApiClient mftApiClient = new MFTApiClient(
- this.configuration.getOutboundEventProcessor().getMftHost(),
- this.configuration.getOutboundEventProcessor().getMftPort())) {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
- directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
- }
-
List<String> resourceIDsToProcess = new ArrayList<>();
- for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
- logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
- resourceList = createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
- fileMetadata.getResourcePath(), "FILE", adminUser);
- GenericResource fileResource = resourceList.get(resourceList.size() - 1);
- resourceIDsToProcess.add(fileResource.getResourceId());
- }
-
- for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
- logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
- createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
- directoryMetadata.getResourcePath(),
- "COLLECTION", adminUser);
- // TODO scan directories
- }
+ // Fetching file list for parent resource
+ scanResourceForChildResources(resourceObj, mftAuth, sourceSP, sourceStorageId, sourceHostName,
+ adminUser, resourceIDsToProcess, resourceCache, 4);
logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
- resourceList = createResourceRecursively(destinationHostName, destinationStorageId, notification.getBasePath(),
- notification.getResourcePath(), "FILE", adminUser);
+ resourceList = createResourceWithParentDirectories(destinationHostName, destinationStorageId, notification.getBasePath(),
+ notification.getResourcePath(), "FILE", adminUser, resourceCache);
GenericResource destinationResource = resourceList.get(resourceList.size() - 1);
@@ -328,4 +298,58 @@ public class OrchestratorEventProcessor implements Runnable {
this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
}
}
+
+ private void scanResourceForChildResources(GenericResource resourceObj, AuthToken mftAuth, AnyStoragePreference sourceSP,
+ String sourceStorageId, String sourceHostName, String adminUser,
+ List<String> resourceIDsToProcess, Map<String, GenericResource> resourceCache,
+ int scanDepth)
+ throws Exception {
+
+ FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
+ .setMftAuthorizationToken(mftAuth)
+ .setResourceId(resourceObj.getResourceId());
+
+ switch (sourceSP.getStorageCase()) {
+ case SSH_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("SCP");
+ resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
+ break;
+ case S3_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("S3");
+ resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
+ break;
+ }
+
+ DirectoryMetadataResponse directoryResourceMetadata;
+
+ try (MFTApiClient mftApiClient = new MFTApiClient(
+ this.configuration.getOutboundEventProcessor().getMftHost(),
+ this.configuration.getOutboundEventProcessor().getMftPort())) {
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
+ directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
+ }
+
+ for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
+ logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
+ List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
+ fileMetadata.getResourcePath(), "FILE", adminUser, resourceCache);
+ GenericResource fileResource = resourceList.get(resourceList.size() - 1);
+
+ resourceIDsToProcess.add(fileResource.getResourceId());
+ }
+
+ for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
+ logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
+ List<GenericResource> createResources = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
+ directoryMetadata.getResourcePath(),
+ "COLLECTION", adminUser, resourceCache);
+ GenericResource dirResource = createResources.get(createResources.size() - 1);
+
+ if (scanDepth > 0) {
+ // Scanning the directories recursively
+ scanResourceForChildResources(dirResource, mftAuth, sourceSP, sourceStorageId, sourceHostName, adminUser,
+ resourceIDsToProcess, resourceCache, scanDepth - 1);
+ }
+ }
+ }
}