You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/09 15:42:47 UTC
[airavata-data-lake] branch master updated: Adding host name and
resource path for all resources as metadata
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new e665320 Adding host name and resource path for all resources as metadata
e665320 is described below
commit e66532090690b0f0155619a8113c49f8eb5c28fe
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Sep 9 11:42:06 2021 -0400
Adding host name and resource path for all resources as metadata
---
.../orchestrator/connectors/DRMSConnector.java | 30 ++++++++++++++++++++++
.../handlers/async/OrchestratorEventProcessor.java | 29 ++++++++++++++++-----
2 files changed, 53 insertions(+), 6 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index 350aee8..d8ee739 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -1,5 +1,7 @@
package org.apache.airavata.datalake.orchestrator.connectors;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.drms.AuthCredentialType;
@@ -16,6 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -174,6 +177,33 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
}
}
+ public void addResourceMetadata(String authToken,
+ String tenantId,
+ String resourceId,
+ String user,
+ Map<String, String> metadata) {
+
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(authToken)
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(user)
+ .setTenantId(tenantId)
+ .build())
+ .build();
+
+ Struct.Builder structBuilder = Struct.newBuilder();
+ metadata.forEach((key, value) -> structBuilder.putFields(key,
+ Value.newBuilder().setStringValue(value).build()));
+
+ resourceServiceBlockingStub.addResourceMetadata(AddResourceMetadataRequest.newBuilder()
+ .setResourceId(resourceId)
+ .setAuthToken(serviceAuthToken)
+ .setType("FILE")
+ .setMetadata(structBuilder.build()).build());
+ }
+
+
public Optional<AnyStoragePreference> getStoragePreference(String authToken, String username, String tenantId, String storageId) {
DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
.setAccessToken(authToken)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 1d20f59..c7a6e1b 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -62,7 +62,7 @@ public class OrchestratorEventProcessor implements Runnable {
this.notificationClient = notificationClient;
}
- private List<GenericResource> createResourceRecursively(String storageId, String basePath,
+ private List<GenericResource> createResourceRecursively(String hostName, String storageId, String basePath,
String resourcePath, String resourceType, String user)
throws Exception {
@@ -84,6 +84,13 @@ public class OrchestratorEventProcessor implements Runnable {
resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
if (optionalGenericResource.isPresent()) {
parentId = optionalGenericResource.get().getResourceId();
+
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put("resourcePath", currentPath);
+ metadata.put("hostName", hostName);
+ this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
+ notification.getTenantId(), parentId, user, metadata);
+
parentType = "COLLECTION";
resourceList.add(optionalGenericResource.get());
} else {
@@ -102,7 +109,15 @@ public class OrchestratorEventProcessor implements Runnable {
parentId, resourceType, parentType, user);
if (optionalGenericResource.isPresent()) {
- resourceList.add(optionalGenericResource.get());
+ GenericResource genericResource = optionalGenericResource.get();
+
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put("resourcePath", currentPath);
+ metadata.put("hostName", hostName);
+ this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
+ notification.getTenantId(), genericResource.getResourceId(), user, metadata);
+
+ resourceList.add(genericResource);
} else {
logger.error("Could not create a resource for path {}", currentPath);
throw new Exception("Could not create a resource for path " + currentPath);
@@ -175,11 +190,13 @@ public class OrchestratorEventProcessor implements Runnable {
TransferMapping transferMapping = optionalTransferMapping.get();
String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
+ String sourceHostName = transferMapping.getSourceStorage().getSshStorage().getHostName();
String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+ String destinationHostName = transferMapping.getDestinationStorage().getSshStorage().getHostName();
// Creating parent resource
- List<GenericResource> resourceList = createResourceRecursively(sourceStorageId,
+ List<GenericResource> resourceList = createResourceRecursively(sourceHostName, sourceStorageId,
notification.getBasePath(),
notification.getResourcePath(),
"COLLECTION", adminUser);
@@ -258,7 +275,7 @@ public class OrchestratorEventProcessor implements Runnable {
List<String> resourceIDsToProcess = new ArrayList<>();
for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
- resourceList = createResourceRecursively(sourceStorageId, notification.getBasePath(),
+ resourceList = createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
fileMetadata.getResourcePath(), "FILE", adminUser);
GenericResource fileResource = resourceList.get(resourceList.size() - 1);
@@ -267,14 +284,14 @@ public class OrchestratorEventProcessor implements Runnable {
for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
- createResourceRecursively(sourceStorageId, notification.getBasePath(),
+ createResourceRecursively(sourceHostName, sourceStorageId, notification.getBasePath(),
directoryMetadata.getResourcePath(),
"COLLECTION", adminUser);
// TODO scan directories
}
logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
- resourceList = createResourceRecursively(destinationStorageId, notification.getBasePath(),
+ resourceList = createResourceRecursively(destinationHostName, destinationStorageId, notification.getBasePath(),
notification.getResourcePath(), "FILE", adminUser);
GenericResource destinationResource = resourceList.get(resourceList.size() - 1);