You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/07 04:53:58 UTC
[airavata-data-lake] branch master updated: complete transfer
mapping, storage source and destination detection,
collection herachy creation from path
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new f2c2ac0 complete transfer mapping, storage source and destination detection, collection herachy creation from path
new d48c786 Merge pull request #16 from isururanawaka/workflow_merge
f2c2ac0 is described below
commit f2c2ac0b293814de80afde9e91f6ebef700a8069
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Wed Jul 7 00:52:57 2021 -0400
complete transfer mapping, storage source and destination detection, collection herachy creation from path
---
.../file/client/model/Configuration.java | 50 +++----
.../file/client/watcher/FileWatcher.java | 2 +-
.../src/main/resources/application.properties | 2 +-
.../src/main/resources/config.yml | 2 +-
.../messaging/model/NotificationEvent.java | 20 +--
.../model/NotificationEventDeserializer.java | 2 +-
.../model/NotificationEventSerializer.java | 2 +-
.../persistance/DataOrchestratorEntity.java | 10 +-
.../airavata/datalake/orchestrator/Utils.java | 29 ++++
.../orchestrator/connectors/DRMSConnector.java | 159 +++++++++++++++++++++
.../connectors/WorkflowServiceConnector.java | 72 ++++++++++
.../db/inmemory/DefaultInMemoryStore.java | 74 ----------
.../processor/InboundEventProcessor.java | 33 +----
.../processor/OutboundEventProcessor.java | 146 +++++++------------
.../datalake/orchestrator/core/AbstractTask.java | 26 ----
.../orchestrator/core/adaptors/StorageAdaptor.java | 17 ---
.../core/connector/AbstractConnector.java | 14 ++
.../core/processor/MessageProcessor.java | 4 +-
.../drms/api/handlers/ResourceServiceHandler.java | 17 +--
.../drms-rest-proxy/src/main/resources/drms.pb | Bin 105597 -> 109380 bytes
.../src/main/proto/resource/DRMSResource.proto | 1 +
21 files changed, 381 insertions(+), 301 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
index 03aeba8..630db9c 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
@@ -2,7 +2,7 @@ package org.apache.airavata.dataorchestrator.file.client.model;
public class Configuration {
private String listeningPath;
- private String storagePreferenceId;
+ private String hostName;
private Producer producer;
@@ -11,12 +11,12 @@ public class Configuration {
public Configuration() {
}
- public String getStoragePreferenceId() {
- return storagePreferenceId;
+ public String getHostName() {
+ return hostName;
}
- public void setStoragePreferenceId(String storagePreferenceId) {
- this.storagePreferenceId = storagePreferenceId;
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
}
public String getListeningPath() {
@@ -73,36 +73,36 @@ public class Configuration {
}
}
- public static class Custos {
+ public static class Custos {
private String serviceAccountId;
private String serviceAccountSecret;
private String tenantId;
- public String getServiceAccountId() {
- return serviceAccountId;
- }
+ public String getServiceAccountId() {
+ return serviceAccountId;
+ }
- public void setServiceAccountId(String serviceAccountId) {
- this.serviceAccountId = serviceAccountId;
- }
+ public void setServiceAccountId(String serviceAccountId) {
+ this.serviceAccountId = serviceAccountId;
+ }
- public String getServiceAccountSecret() {
- return serviceAccountSecret;
- }
+ public String getServiceAccountSecret() {
+ return serviceAccountSecret;
+ }
- public void setServiceAccountSecret(String serviceAccountSecret) {
- this.serviceAccountSecret = serviceAccountSecret;
- }
+ public void setServiceAccountSecret(String serviceAccountSecret) {
+ this.serviceAccountSecret = serviceAccountSecret;
+ }
- public String getTenantId() {
- return tenantId;
- }
+ public String getTenantId() {
+ return tenantId;
+ }
- public void setTenantId(String tenantId) {
- this.tenantId = tenantId;
- }
- }
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ }
+ }
}
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
index d7870c2..9e16018 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
@@ -167,7 +167,7 @@ public class FileWatcher implements Runnable {
+ ":" + configuration.getCustos().getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)));
context.setBasePath(configuration.getListeningPath());
context.setTenantId(configuration.getCustos().getTenantId());
- context.setStoragePreferenceId(configuration.getStoragePreferenceId());
+ context.setHostName(configuration.getHostName());
event.setContext(context);
return event;
}
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties
index 8c8bbf1..6e03154 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties
@@ -1 +1 @@
-config.path=/Users/dimuthu/code/airavata-data-lake/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
\ No newline at end of file
+config.path=/Users/isururanawaka/Documents/Airavata_Repository/airavata-data-lake/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
\ No newline at end of file
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
index 5c1d0ac..cae9b9f 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
@@ -1,5 +1,5 @@
listeningPath: "/Users/isururanawaka/Documents/texts"
-storagePreferenceId:""
+hostName: "beta.iubemcenter.scigap.org"
producer:
brokerURL: "localhost:9092"
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
index fc4a09a..1cda9db 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
@@ -68,7 +68,7 @@ public class NotificationEvent {
private Long occuredTime;
private String authToken;
private String tenantId;
- private String storagePreferenceId;
+ private String hostName;
private String basePath;
@@ -104,14 +104,6 @@ public class NotificationEvent {
this.tenantId = tenantId;
}
- public String getStoragePreferenceId() {
- return storagePreferenceId;
- }
-
- public void setStoragePreferenceId(String storagePreferenceId) {
- this.storagePreferenceId = storagePreferenceId;
- }
-
public String getBasePath() {
return basePath;
}
@@ -119,10 +111,18 @@ public class NotificationEvent {
public void setBasePath(String basePath) {
this.basePath = basePath;
}
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
}
public String getResourceId() {
- return context.storagePreferenceId + ":" + resourcePath + ":" + resourceType;
+ return context.hostName+ ":" + resourcePath + ":" + resourceType;
}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index 5fddbcd..f19d272 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -25,7 +25,7 @@ public class NotificationEventDeserializer implements Deserializer<NotificationE
context.setOccuredTime(Long.valueOf(parts[2]));
context.setAuthToken(String.valueOf(parts[3]));
context.setTenantId(String.valueOf(parts[4]));
- context.setStoragePreferenceId(parts[5]);
+ context.setHostName(parts[5]);
context.setBasePath(parts[6]);
event.setResourcePath(parts[7]);
event.setResourceType(parts[8]);
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index e71ee83..7a55770 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -20,7 +20,7 @@ public class NotificationEventSerializer implements Serializer<NotificationEvent
notificationEvent.getContext().getOccuredTime() + "," +
notificationEvent.getContext().getAuthToken() + "," +
notificationEvent.getContext().getTenantId() + "," +
- notificationEvent.getContext().getStoragePreferenceId()+ "," +
+ notificationEvent.getContext().getHostName() + "," +
notificationEvent.getContext().getBasePath() + "," +
notificationEvent.getResourcePath() + "," +
notificationEvent.getResourceType() + "," +
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java
index 3815f08..562690e 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java
@@ -65,7 +65,7 @@ public class DataOrchestratorEntity {
private String authToken;
@Column(nullable = false)
- private String storagePreferenceId;
+ private String hostName;
@Lob
private String error;
@@ -202,11 +202,11 @@ public class DataOrchestratorEntity {
this.authToken = authToken;
}
- public String getStoragePreferenceId() {
- return storagePreferenceId;
+ public String getHostName() {
+ return hostName;
}
- public void setStoragePreferenceId(String storagePreferenceId) {
- this.storagePreferenceId = storagePreferenceId;
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
}
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
new file mode 100644
index 0000000..420c042
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.datalake.orchestrator;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class Utils {
+
+ public static String getId(String message) throws NoSuchAlgorithmException {
+ MessageDigest md = MessageDigest.getInstance("SHA-256");
+ // digest() method called
+ // to calculate message digest of an input
+ // and return array of byte
+ byte[] array = md.digest(message.getBytes(StandardCharsets.UTF_8));
+ // Convert byte array into signum representation
+ BigInteger number = new BigInteger(1, array);
+
+ // Convert message digest into hex value
+ StringBuilder hexString = new StringBuilder(number.toString(16));
+
+ // Pad with leading zeros
+ while (hexString.length() < 32) {
+ hexString.insert(0, '0');
+ }
+
+ return hexString.toString();
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
new file mode 100644
index 0000000..d76ffa6
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -0,0 +1,159 @@
+package org.apache.airavata.datalake.orchestrator.connectors;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * DRMS connector to connect with DRMS service
+ */
+public class DRMSConnector implements AbstractConnector<Configuration> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DRMSConnector.class);
+
+ private ManagedChannel drmsChannel;
+ private ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub;
+ private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub storagePreferenceServiceBlockingStub;
+
+ public DRMSConnector(Configuration configuration) throws Exception {
+ this.init(configuration);
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {
+ this.drmsChannel = ManagedChannelBuilder
+ .forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
+ configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
+ this.resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(drmsChannel);
+ this.storagePreferenceServiceBlockingStub = StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.drmsChannel.shutdown();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !this.drmsChannel.isShutdown();
+ }
+
+ public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+ FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .build();
+ FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+ List<TransferMapping> transferMappingList = response.getMappingsList();
+ AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+ if (!transferMappingList.isEmpty()) {
+ transferMappingList.forEach(transferMapping -> {
+ if (transferMapping.getSourceStoragePreference().getStorageCase()
+ .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+ if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
+ .getStorage().getHostName().equals(hostname)) {
+ storagePreferenceId
+ .set(transferMapping.getSourceStoragePreference()
+ .getSshStoragePreference().getStoragePreferenceId());
+ }
+ }
+ });
+ }
+ return Optional.ofNullable(storagePreferenceId.get());
+ }
+
+ public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+ FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .build();
+ FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+ List<TransferMapping> transferMappingList = response.getMappingsList();
+ AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+ if (!transferMappingList.isEmpty()) {
+ transferMappingList.forEach(transferMapping -> {
+ if (transferMapping.getDestinationStoragePreference().getStorageCase()
+ .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+ if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
+ .getStorage().getHostName().equals(hostname)) {
+ storagePreferenceId
+ .set(transferMapping.getDestinationStoragePreference()
+ .getSshStoragePreference().getStoragePreferenceId());
+ }
+ }
+ });
+ }
+ return Optional.ofNullable(storagePreferenceId.get());
+ }
+
+
+ public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
+ String resourceId,
+ String resourceName,
+ String resourcePath,
+ String parentId,
+ String type) {
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+
+ GenericResource genericResource = GenericResource
+ .newBuilder()
+ .setResourceId(resourceId)
+ .setResourceName(resourceName)
+ .setResourcePath(resourcePath)
+ .setType(type)
+ .setParentId(parentId).build();
+ ResourceCreateRequest resourceCreateRequest = ResourceCreateRequest
+ .newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .setResource(genericResource)
+ .build();
+
+ try {
+ ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
+ return Optional.ofNullable(resourceCreateResponse.getResource());
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
+ repository.save(entity);
+ return Optional.empty();
+ }
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
new file mode 100644
index 0000000..d3cbc5e
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -0,0 +1,72 @@
+package org.apache.airavata.datalake.orchestrator.connectors;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
+import org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Drms connector to call DRMS services
+ */
+
+public class WorkflowServiceConnector implements AbstractConnector<Configuration> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DRMSConnector.class);
+
+ private ManagedChannel workflowChannel;
+ private WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceStub;
+
+
+ public WorkflowServiceConnector(Configuration configuration) throws Exception {
+ this.init(configuration);
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {
+ this.workflowChannel = ManagedChannelBuilder
+ .forAddress(configuration.getOutboundEventProcessor().getWorkflowEngineHost(),
+ configuration.getOutboundEventProcessor().getWorkflowPort()).usePlaintext().build();
+ this.workflowServiceStub = WorkflowServiceGrpc.newBlockingStub(workflowChannel);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ public void invokeWorkflow(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity, GenericResource resource) {
+ try {
+ WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken
+ .newBuilder()
+ .setAccessToken("")
+ .build();
+ WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
+ .setResourceId(resource.getResourceId())
+ .build();
+
+ WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
+ .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
+ this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while invoking workflow engine", entity.getResourceId(), ex);
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ entity.setError("Error occurred while invoking workflow engine" + ex.getMessage());
+ repository.save(entity);
+ return;
+ }
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/db/inmemory/DefaultInMemoryStore.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/db/inmemory/DefaultInMemoryStore.java
deleted file mode 100644
index 7630e47..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/db/inmemory/DefaultInMemoryStore.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.db.inmemory;
-
-import org.apache.airavata.datalake.orchestrator.core.adaptors.StorageAdaptor;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Stack;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Default in memory store that stores all transient events
- */
-public class DefaultInMemoryStore implements StorageAdaptor<NotificationEvent> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DefaultInMemoryStore.class);
-
- private static final ConcurrentHashMap<String, Stack<NotificationEvent>> inmemoryStore = new ConcurrentHashMap();
- private static final ConcurrentLinkedQueue<String> resourceIdQueue = new ConcurrentLinkedQueue<>();
-
-
- @Override
- public void save(NotificationEvent object) {
- inmemoryStore.computeIfAbsent(object.getResourceId(), k -> new Stack<NotificationEvent>()).push(object);
- resourceIdQueue.add(object.getResourceId());
- }
-
- @Override
- public void delete(String id) {
- inmemoryStore.computeIfPresent(id, (k, v) -> {
- v.remove(id);
- return v;
- });
- resourceIdQueue.remove(id);
- }
-
- @Override
- public NotificationEvent get(String id) {
- return inmemoryStore.get(id).get(0);
- }
-
- @Override
- public NotificationEvent update(NotificationEvent object) throws UnsupportedOperationException {
- throw new UnsupportedOperationException("Update events are not supported in default inmemory store");
- }
-
- @Override
- public List<NotificationEvent> poll(int numOfEvents) {
- List<NotificationEvent> notificationEventList = new ArrayList<>();
- AtomicInteger count = new AtomicInteger(1);
- int iterations = 0;
- while (count.get() <= numOfEvents) {
- String value = resourceIdQueue.poll();
- List<NotificationEvent> finalNotificationEventList = notificationEventList;
- Optional.ofNullable(value).ifPresent(val-> {
- Stack events = inmemoryStore.remove(val);
- count.set(count.get() + events.size());
- finalNotificationEventList.addAll(events);
- });
- iterations++;
- if (iterations > 100) {
- break;
- }
- }
- return notificationEventList;
- }
-
-
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
index 92fc28b..faddd61 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
@@ -1,6 +1,7 @@
package org.apache.airavata.datalake.orchestrator.processor;
import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
@@ -11,9 +12,7 @@ import org.dozer.loader.api.BeanMappingBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Base64;
@@ -24,7 +23,7 @@ import java.util.regex.Pattern;
/**
* This class is responsible for pick events from kafka queue and publish them into inmemory store
*/
-public class InboundEventProcessor implements MessageProcessor {
+public class InboundEventProcessor implements MessageProcessor<Configuration> {
private static final Logger LOGGER = LoggerFactory.getLogger(InboundEventProcessor.class);
private Configuration configuration;
private NotificationEvent notificationEvent;
@@ -34,15 +33,15 @@ public class InboundEventProcessor implements MessageProcessor {
public InboundEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
DataOrchestratorEventRepository repository) throws Exception {
- this.configuration = configuration;
this.notificationEvent = notificationEvent;
this.repository = repository;
- this.init();
+ this.init(configuration);
}
@Override
- public void init() throws Exception {
+ public void init(Configuration configuration) throws Exception {
try {
+ this.configuration = configuration;
dozerBeanMapper = new DozerBeanMapper();
BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
@Override
@@ -109,7 +108,7 @@ public class InboundEventProcessor implements MessageProcessor {
entity.setEventStatus(EventStatus.DATA_ORCH_RECEIVED.name());
entity.setEventType(event.getContext().getEvent().name());
entity.setAuthToken(event.getContext().getAuthToken());
- entity.setStoragePreferenceId(event.getContext().getStoragePreferenceId());
+ entity.setHostName(event.getContext().getHostName());
String resourcePath = event.getResourcePath();
String basePath = event.getContext().getBasePath();
@@ -124,27 +123,9 @@ public class InboundEventProcessor implements MessageProcessor {
.decode(event.getContext().getAuthToken().getBytes(StandardCharsets.UTF_8)));
String agentId = authDecoded.split(":")[0];
entity.setAgentId(agentId);
- entity.setResourceId(getResourceId(event.getResourceId()));
+ entity.setResourceId(Utils.getId(event.getResourceId()));
return entity;
}
- private String getResourceId(String message) throws NoSuchAlgorithmException {
- MessageDigest md = MessageDigest.getInstance("SHA-256");
- // digest() method called
- // to calculate message digest of an input
- // and return array of byte
- byte[] array = md.digest(message.getBytes(StandardCharsets.UTF_8));
- // Convert byte array into signum representation
- BigInteger number = new BigInteger(1, array);
-
- // Convert message digest into hex value
- StringBuilder hexString = new StringBuilder(number.toString(16));
-
- // Pad with leading zeros
- while (hexString.length() < 32) {
- hexString.insert(0, '0');
- }
- return hexString.toString();
- }
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 7c9e7cc..8038061 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,64 +1,44 @@
package org.apache.airavata.datalake.orchestrator.processor;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import org.apache.airavata.datalake.drms.AuthCredentialType;
-import org.apache.airavata.datalake.drms.AuthenticatedUser;
-import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
import org.apache.airavata.datalake.drms.resource.GenericResource;
-import org.apache.airavata.datalake.drms.storage.*;
import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
+import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
import org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
-import org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import org.dozer.DozerBeanMapper;
import org.dozer.loader.api.BeanMappingBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* This class is responsible and publish events to registry and
* Workflow engine
*/
-public class OutboundEventProcessor implements MessageProcessor {
+public class OutboundEventProcessor implements MessageProcessor<Configuration> {
private static final Logger LOGGER = LoggerFactory.getLogger(OutboundEventProcessor.class);
private DozerBeanMapper dozerBeanMapper;
private DataOrchestratorEventRepository repository;
- private final ManagedChannel workflowChannel;
- private final ManagedChannel drmsChannel;
- private final WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceStub;
- private final ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub;
- private final StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub storagePreferenceServiceBlockingStub;
+ private DRMSConnector drmsConnector;
+ private WorkflowServiceConnector workflowServiceConnector;
public OutboundEventProcessor(Configuration configuration, DataOrchestratorEventRepository repository) throws Exception {
this.repository = repository;
- //convert these to SSL
- this.workflowChannel = ManagedChannelBuilder
- .forAddress(configuration.getOutboundEventProcessor().getWorkflowEngineHost(),
- configuration.getOutboundEventProcessor().getWorkflowPort()).usePlaintext().build();
- this.drmsChannel = ManagedChannelBuilder.forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
- configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
- this.workflowServiceStub = WorkflowServiceGrpc.newBlockingStub(workflowChannel);
- this.resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(drmsChannel);
- this.storagePreferenceServiceBlockingStub = StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
- this.init();
+ this.init(configuration);
}
@Override
- public void init() throws Exception {
+ public void init(Configuration configuration) throws Exception {
+ this.drmsConnector = new DRMSConnector(configuration);
+ this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
dozerBeanMapper = new DozerBeanMapper();
BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
@Override
@@ -72,8 +52,8 @@ public class OutboundEventProcessor implements MessageProcessor {
@Override
public void close() throws Exception {
- this.workflowChannel.shutdown();
- this.drmsChannel.shutdown();
+ this.drmsConnector.close();
+ this.workflowServiceConnector.close();
}
@@ -103,84 +83,54 @@ public class OutboundEventProcessor implements MessageProcessor {
private void processEvent(DataOrchestratorEntity entity) {
try {
- DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
- .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
- .setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(entity.getOwnerId())
- .setTenantId(entity.getTenantId())
- .build())
- .build();
-
- StoragePreferenceFetchRequest storagePreferenceFetchRequest = StoragePreferenceFetchRequest.newBuilder()
- .setStoragePreferenceId(entity.getStoragePreferenceId()).setAuthToken(serviceAuthToken).build();
-
- StoragePreferenceFetchResponse response = storagePreferenceServiceBlockingStub
- .fetchStoragePreference(storagePreferenceFetchRequest);
-
- if (!response.hasStoragePreference()) {
+
+ String ownerId = entity.getOwnerId();
+ String resourcePath = entity.getResourcePath();
+ String tail = resourcePath.substring(resourcePath.indexOf(ownerId));
+ String[] collections = tail.split("/");
+
+ Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+ if (optionalStorPref.isEmpty()) {
entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("StoragePreference not found ");
+ entity.setError("StoragePreference not found for host: " + entity.getHostName());
repository.save(entity);
return;
}
- //currently only ssh is working
- if (response.getStoragePreference().getSshStoragePreference().isInitialized()) {
- GenericResource genericResource = GenericResource
- .newBuilder()
- .setResourceId(entity.getResourceId())
- .setResourceName(entity.getResourceName())
- .setResourcePath(entity.getResourcePath())
- .setType("FILE")
- .setSshPreference(response.getStoragePreference().getSshStoragePreference()).build();
- ResourceCreateRequest resourceCreateRequest = ResourceCreateRequest
- .newBuilder()
- .setAuthToken(serviceAuthToken)
- .setResource(genericResource)
- .build();
- GenericResource resource = null;
- try {
- ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
- resource = resourceCreateResponse.getResource();
- } catch (Exception ex) {
- LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
- repository.save(entity);
- return;
- }
+ String parentId = optionalStorPref.get();
+
+ for (int i = 1; i < collections.length - 1; i++) {
+ String resourceName = collections[i];
+ String entityId = Utils.getId(resourcePath.substring(resourcePath.indexOf(resourceName)));
+ String path = entity.getResourcePath().substring(0, entity.getResourcePath().indexOf(resourceName));
+ Optional<GenericResource> optionalGenericResource =
+ this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, parentId, "COLLECTION");
+ if (optionalGenericResource.isPresent()) {
+ parentId = optionalGenericResource.get().getResourceId();
- try {
- WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken
- .newBuilder()
- .setAccessToken("")
- .build();
- WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
- .setResourceId(resource.getResourceId())
- .build();
-
- WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
- .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
- this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
- } catch (Exception ex) {
- LOGGER.error("Error occurred while invoking workflow engine", entity.getResourceId(), ex);
+ } else {
entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while invoking workflow engine" + ex.getMessage());
+ entity.setError("Collection structure creation failed: " + entity.getHostName());
repository.save(entity);
return;
}
- } else {
- LOGGER.error("Incorrect storage preference {}", entity.getStoragePreferenceId());
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Incorrect storage preference " + entity.getStoragePreferenceId());
+ }
+
+ Optional<GenericResource> optionalGenericResource =
+ this.drmsConnector.createResource(repository, entity, entity.getResourceId(),
+ collections[collections.length - 1], entity.getResourcePath(),
+ parentId, "FILE");
+
+
+ if (optionalGenericResource.isPresent()) {
+ this.workflowServiceConnector.invokeWorkflow(repository, entity, optionalGenericResource.get());
+ entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
repository.save(entity);
- return;
+ } else {
+
}
- entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
- repository.save(entity);
} catch (Exception exception) {
- LOGGER.error("Error occurred while processing outbound data orcehstrator event", exception);
+ LOGGER.error("Error occurred while processing outbound data orchestrator event", exception);
entity.setEventStatus(EventStatus.ERRORED.name());
entity.setError("Error occurred while processing ");
repository.save(entity);
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/AbstractTask.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/AbstractTask.java
deleted file mode 100644
index 91f6e59..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/AbstractTask.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.datalake.orchestrator.core;
-
-/**
- * TODO: Task framework implementation
- */
-public abstract class AbstractTask {
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/adaptors/StorageAdaptor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/adaptors/StorageAdaptor.java
deleted file mode 100644
index aab4c32..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/adaptors/StorageAdaptor.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.core.adaptors;
-
-import java.util.List;
-
-public interface StorageAdaptor<T> {
-
- void save(T object) throws UnsupportedOperationException;
-
- void delete(String id) throws UnsupportedOperationException;
-
- T get(String id) throws UnsupportedOperationException;
-
- T update(T object) throws UnsupportedOperationException;
-
- List<T> poll(int numOfEvents) throws UnsupportedOperationException;
-
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/connector/AbstractConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/connector/AbstractConnector.java
new file mode 100644
index 0000000..f206255
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/connector/AbstractConnector.java
@@ -0,0 +1,14 @@
+package org.apache.airavata.datalake.orchestrator.core.connector;
+
+/**
+ * Interface to implement external connectors
+ */
+public interface AbstractConnector<T> {
+
+ void init(T configuration) throws Exception;
+
+ void close() throws Exception;
+
+ boolean isOpen();
+
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java
index 215b109..df63877 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java
@@ -1,8 +1,8 @@
package org.apache.airavata.datalake.orchestrator.core.processor;
-public interface MessageProcessor extends Runnable{
+public interface MessageProcessor<T> extends Runnable{
- void init() throws Exception;
+ void init(T configuration) throws Exception;
void close() throws Exception;
}
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 750b885..9201ead 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -120,21 +120,12 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
userProps.put("username", callUser.getUsername());
userProps.put("tenantId", callUser.getTenantId());
-
- String storagePreferenceId = "";
-
- if (request.getResource().getStoragePreferenceCase()
- .equals(GenericResource.StoragePreferenceCase.S3_PREFERENCE)) {
- storagePreferenceId = request.getResource().getS3Preference().getStoragePreferenceId();
- } else if (request.getResource().getStoragePreferenceCase()
- .equals(GenericResource.StoragePreferenceCase.SSH_PREFERENCE)) {
- storagePreferenceId = request.getResource().getSshPreference().getStoragePreferenceId();
- }
+ String parentId = request.getResource().getParentId();
String entityId = request.getResource().getResourceId();
Map<String, Object> serializedMap = GenericResourceSerializer.serializeToMap(request.getResource());
Optional<Entity> exEntity = CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
- storagePreferenceId, type, entityId,
+ parentId, type, entityId,
request.getResource().getResourceName(), request.getResource().getResourceName(),
callUser.getUsername());
@@ -148,10 +139,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
serializedMap.put("lastModifiedTime", exEntity.get().getCreatedAt());
serializedMap.put("owner", exEntity.get().getOwnerId());
- if (!storagePreferenceId.isEmpty()) {
+ if (!parentId.isEmpty()) {
this.neo4JConnector.mergeNodesWithParentChildRelationShip(serializedMap, new HashMap<>(),
request.getResource().getType(), StoragePreferenceConstants.STORAGE_PREFERENCE_LABEL,
- callUser.getUsername(), entityId, storagePreferenceId, callUser.getTenantId());
+ callUser.getUsername(), entityId, parentId, callUser.getTenantId());
} else {
this.neo4JConnector.mergeNode(serializedMap, request.getResource().getType(),
callUser.getUsername(), entityId, callUser.getTenantId());
diff --git a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 0daacb7..fd2c03a 100644
Binary files a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb and b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb differ
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
index 6a29891..db7741a 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
@@ -35,4 +35,5 @@ message GenericResource {
string type=6;
string parent_resource_path = 7;
string resource_name = 8;
+ string parent_id = 9;
}
\ No newline at end of file