You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/08/23 11:37:20 UTC
[airavata-data-lake] branch master updated: Rewriting data
orchestrator logic to support directory scanning through MFT
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new ef6a5b0 Rewriting data orchestrator logic to support directory scanning through MFT
ef6a5b0 is described below
commit ef6a5b0f0d748a459b59c331f33c032632fa0fbb
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Aug 23 07:37:09 2021 -0400
Rewriting data orchestrator logic to support directory scanning through MFT
---
ansible/inventories/test/group_vars/all/vars.yml | 2 +-
.../templates/data-orchestrator/config.yml.j2 | 2 +
.../clients/core/AbstractListener.java | 15 +-
.../clients/core/EventPublisher.java | 3 +-
.../file/client/model/Configuration.java | 9 +
.../file/client/publisher/FileEventPublisher.java | 5 +-
.../file/client/watcher/FileWatcher.java | 77 +++---
.../messaging/MessagingEvents.java | 9 -
.../messaging/model/NotificationEvent.java | 128 ++++------
.../model/NotificationEventDeserializer.java | 21 +-
.../model/NotificationEventSerializer.java | 16 +-
.../messaging/publisher/MessageProducer.java | 2 +-
.../persistance/entity/DataOrchestratorEntity.java | 226 ------------------
.../persistance/entity/OwnershipEntity.java | 56 -----
.../persistance/entity/WorkflowEntity.java | 108 ---------
.../persistance/entity/WorkflowTaskEntity.java | 118 ----------
.../entity/parser/DataParsingJobOutputEntity.java | 11 +
.../DataOrchestratorEventRepository.java | 32 ---
.../repository/WorkflowEntityRepository.java | 7 -
.../datalake/orchestrator/Configuration.java | 18 ++
.../orchestrator/connectors/DRMSConnector.java | 73 ++----
.../connectors/WorkflowServiceConnector.java | 6 +-
.../handlers/async/OrchestratorEventHandler.java | 24 +-
.../handlers/async/OrchestratorEventProcessor.java | 260 +++++++++++++++++++++
.../processor/InboundEventProcessor.java | 145 ------------
.../processor/OutboundEventProcessor.java | 197 ----------------
.../src/main/proto/parsing.proto | 1 +
.../wm/datasync/DataParsingWorkflowManager.java | 211 ++++++++---------
.../wm/datasync/DataSyncWorkflowManager.java | 6 +-
.../wm/datasync/WorkflowEngineAPIHandler.java | 17 +-
.../src/main/proto/service/WorkflowService.proto | 3 +-
31 files changed, 577 insertions(+), 1231 deletions(-)
diff --git a/ansible/inventories/test/group_vars/all/vars.yml b/ansible/inventories/test/group_vars/all/vars.yml
index 8463bab..134537b 100644
--- a/ansible/inventories/test/group_vars/all/vars.yml
+++ b/ansible/inventories/test/group_vars/all/vars.yml
@@ -42,7 +42,7 @@ custos_repo: "https://github.com/apache/airavata-custos.git"
custos_git_branch: develop
mft_default_agent_id: agent0
-mft_default_agent_host: 10.1.0.33
+mft_default_agent_host: 10.1.0.42
mft_default_agent_advertised_url: https://beta.iubemcenter.scigap.org:8443/downloads
mft_default_agent_port: 3333
diff --git a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2 b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
index 40f6fdb..914245a 100644
--- a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
+++ b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
@@ -10,6 +10,8 @@ outboundEventProcessor:
workflowPort: {{ workflow_manager_grpc_port }}
drmsHost: "{{ datalake_drms_host }}"
drmsPort: {{ datalake_drms_grpc_port }}
+ mftHost: "{{ mft_api_service_host }}"
+ mftPort: {{ mft_api_service_grpc_port }}
consumer:
brokerURL: "{{ datalake_data_orch_broker_url }}"
consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
index ba9932b..5395376 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.clients.core;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,28 +21,28 @@ public abstract class AbstractListener implements EventListener {
public void onRegistered(NotificationEvent event) throws Exception {
LOGGER.info(" Registration event received for path " + event.getResourcePath());
- eventPublisher.publish(event, MessagingEvents.REGISTER);
+ eventPublisher.publish(event, NotificationEvent.Type.REGISTER);
}
public void onCreated(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
- event.getResourcePath() + ":" + event.getResourceName() + " Created");
- eventPublisher.publish(event, MessagingEvents.CREATE);
+ event.getResourcePath() + ":" + event.getResourcePath() + " Created");
+ eventPublisher.publish(event, NotificationEvent.Type.CREATE);
}
public void onModified(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
- event.getResourcePath() + ":" + event.getResourceName() + " Created");
- eventPublisher.publish(event, MessagingEvents.MODIFY);
+ event.getResourcePath() + ":" + event.getResourcePath() + " Created");
+ eventPublisher.publish(event, NotificationEvent.Type.MODIFY);
}
public void onDeleted(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
- event.getResourcePath() + ":" + event.getResourceName() + " Created");
- eventPublisher.publish(event, MessagingEvents.DELETE);
+ event.getResourcePath() + ":" + event.getBasePath() + " Created");
+ eventPublisher.publish(event, NotificationEvent.Type.DELETE);
}
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
index 53b40c9..8b52c79 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.clients.core;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import java.util.concurrent.ExecutionException;
@@ -11,7 +10,7 @@ import java.util.concurrent.ExecutionException;
public interface EventPublisher {
- public void publish(NotificationEvent notificationEvent, MessagingEvents event) throws Exception;
+ public void publish(NotificationEvent notificationEvent, NotificationEvent.Type eventType) throws Exception;
}
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
index 630db9c..d4a42df 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
@@ -3,6 +3,7 @@ package org.apache.airavata.dataorchestrator.file.client.model;
public class Configuration {
private String listeningPath;
private String hostName;
+ private int depth = 2;
private Producer producer;
@@ -43,6 +44,14 @@ public class Configuration {
this.custos = custos;
}
+ public int getDepth() {
+ return depth;
+ }
+
+ public void setDepth(int depth) {
+ this.depth = depth;
+ }
+
public static class Producer {
private String brokerURL;
private String publisherId;
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
index 82b8199..a228d1c 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
@@ -2,7 +2,6 @@ package org.apache.airavata.dataorchestrator.file.client.publisher;
import org.apache.airavata.dataorchestrator.clients.core.EventPublisher;
import org.apache.airavata.dataorchestrator.file.client.model.Configuration;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import org.apache.airavata.dataorchestrator.messaging.publisher.MessageProducer;
import org.apache.kafka.clients.producer.Callback;
@@ -25,8 +24,8 @@ public class FileEventPublisher implements EventPublisher {
}
@Override
- public void publish(NotificationEvent notificationEvent, MessagingEvents event) throws ExecutionException, InterruptedException {
- notificationEvent.getContext().setEvent(event);
+ public void publish(NotificationEvent notificationEvent, NotificationEvent.Type eventType) throws ExecutionException, InterruptedException {
+ notificationEvent.setEventType(eventType);
messageProducer.publish(configuration.getProducer().getPublisherTopic(), notificationEvent, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
index 9e16018..c1bf220 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
@@ -29,15 +29,12 @@ public class FileWatcher implements Runnable {
private Configuration configuration;
-
public FileWatcher(File rootFolder, Configuration configuration) throws IOException {
this.rootFolder = rootFolder;
this.configuration = configuration;
}
-
@Override
-
public void run() {
LOGGER.info("Watcher service starting at " + rootFolder.getAbsolutePath());
@@ -51,12 +48,9 @@ public class FileWatcher implements Runnable {
} catch (Exception e) {
LOGGER.error("Error occurred while watching folder " + rootFolder.getAbsolutePath(), e);
Thread.currentThread().interrupt();
-
}
-
}
-
protected void pollEvents(WatchService watchService) throws Exception {
WatchKey key = watchService.take();
@@ -68,10 +62,10 @@ public class FileWatcher implements Runnable {
if (!key.reset()) {
keyPathMap.remove(key);
}
+
if (keyPathMap.isEmpty()) {
return;
}
-
}
@@ -82,12 +76,14 @@ public class FileWatcher implements Runnable {
path = parentPath.resolve(path);
File file = path.toFile();
- FileEvent event = getFileEvent(file);
+ Optional<FileEvent> event = getFileEvent(file);
if (kind == ENTRY_CREATE) {
- for (AbstractListener listener : listeners) {
- listener.onCreated(event);
+ if (event.isPresent()) {
+ for (AbstractListener listener : listeners) {
+ listener.onCreated(event.get());
+ }
}
if (file.isDirectory()) {
@@ -96,23 +92,22 @@ public class FileWatcher implements Runnable {
} else if (kind == ENTRY_MODIFY) {
- for (AbstractListener listener : listeners) {
-
- listener.onModified(event);
-
+ if (event.isPresent()) {
+ for (AbstractListener listener : listeners) {
+ listener.onModified(event.get());
+ }
}
} else if (kind == ENTRY_DELETE) {
- for (AbstractListener listener : listeners) {
- listener.onDeleted(event);
+ if (event.isPresent()) {
+ for (AbstractListener listener : listeners) {
+ listener.onDeleted(event.get());
+ }
}
-
}
-
}
-
public FileWatcher addListener(AbstractListener listener) {
listeners.add(listener);
@@ -152,24 +147,44 @@ public class FileWatcher implements Runnable {
*/
- protected FileEvent getFileEvent(File file) {
+ protected Optional<FileEvent> getFileEvent(File file) {
FileEvent event = new FileEvent();
- if (file.isDirectory()) {
+
+
+ String absolutePath = file.getAbsolutePath();
+ if (configuration.getDepth() > 0) {
+ String relativePath = absolutePath.substring(configuration.getListeningPath().length());
+ if (relativePath.startsWith("/")) {
+ relativePath = relativePath.substring(1);
+ }
+ String[] relativeParts = relativePath.split("/");
+ if (relativeParts.length >= configuration.getDepth()) {
+ String beginPath = configuration.getListeningPath();
+ beginPath = beginPath.endsWith("/") ? beginPath.substring(0, beginPath.length()-1) : beginPath;
+ for (int step = 0; step < configuration.getDepth(); step++) {
+ beginPath = beginPath + "/" + relativeParts[step];
+ }
+ absolutePath = beginPath;
+ } else {
+ LOGGER.warn("Depth of path {} is not greater or equal to required depth {}", absolutePath, configuration.getDepth());
+ return Optional.empty();
+ }
+ }
+
+ if (new File(absolutePath).isDirectory()) {
event.setResourceType(Constants.FOLDER);
} else {
event.setResourceType(Constants.FILE);
}
- event.setResourceName(file.getName());
- event.setResourcePath(file.getAbsolutePath());
- NotificationEvent.Context context = new NotificationEvent.Context();
- context.setOccuredTime(System.currentTimeMillis());
- context.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
+
+ event.setResourcePath(absolutePath);
+ event.setOccuredTime(System.currentTimeMillis());
+ event.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
+ ":" + configuration.getCustos().getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)));
- context.setBasePath(configuration.getListeningPath());
- context.setTenantId(configuration.getCustos().getTenantId());
- context.setHostName(configuration.getHostName());
- event.setContext(context);
- return event;
+ event.setBasePath(configuration.getListeningPath());
+ event.setTenantId(configuration.getCustos().getTenantId());
+ event.setHostName(configuration.getHostName());
+ return Optional.of(event);
}
private static void registerDir(Path path, WatchService watchService) throws
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
deleted file mode 100644
index 3771b72..0000000
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.airavata.dataorchestrator.messaging;
-
-public enum MessagingEvents {
-
- REGISTER,
- CREATE,
- MODIFY,
- DELETE
-}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
index 1cda9db..baecaeb 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
@@ -1,26 +1,25 @@
package org.apache.airavata.dataorchestrator.messaging.model;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
-
-import java.io.Serializable;
-import java.util.UUID;
-
/**
* Notification event represents triggering messages
*/
public class NotificationEvent {
- private String resourcePath;
- private String resourceName;
- private String resourceType;
- private Context context;
- private String id;
-
-
- public NotificationEvent() {
- this.id = UUID.randomUUID().toString();
+ public enum Type {
+ REGISTER,
+ CREATE,
+ MODIFY,
+ DELETE
}
+ private String resourcePath;
+ private String resourceType;
+ private Long occuredTime;
+ private String authToken;
+ private String tenantId;
+ private String hostName;
+ private String basePath;
+ private Type eventType;
public String getResourcePath() {
return resourcePath;
@@ -30,14 +29,6 @@ public class NotificationEvent {
this.resourcePath = resourcePath;
}
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
public String getResourceType() {
return resourceType;
}
@@ -46,84 +37,51 @@ public class NotificationEvent {
this.resourceType = resourceType;
}
- public Context getContext() {
- return context;
+ public Long getOccuredTime() {
+ return occuredTime;
}
- public void setContext(Context context) {
- this.context = context;
+ public void setOccuredTime(Long occuredTime) {
+ this.occuredTime = occuredTime;
}
- public String getId() {
- return id;
+ public String getAuthToken() {
+ return authToken;
}
- public void setId(String id) {
- this.id = id;
+ public void setAuthToken(String authToken) {
+ this.authToken = authToken;
}
- public static class Context implements Serializable {
-
- private MessagingEvents event;
- private Long occuredTime;
- private String authToken;
- private String tenantId;
- private String hostName;
- private String basePath;
-
-
- public MessagingEvents getEvent() {
- return event;
- }
-
- public void setEvent(MessagingEvents event) {
- this.event = event;
- }
-
- public Long getOccuredTime() {
- return occuredTime;
- }
-
- public void setOccuredTime(Long occuredTime) {
- this.occuredTime = occuredTime;
- }
-
- public String getAuthToken() {
- return authToken;
- }
-
- public void setAuthToken(String authToken) {
- this.authToken = authToken;
- }
-
- public String getTenantId() {
- return tenantId;
- }
-
- public void setTenantId(String tenantId) {
- this.tenantId = tenantId;
- }
+ public String getTenantId() {
+ return tenantId;
+ }
- public String getBasePath() {
- return basePath;
- }
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ }
- public void setBasePath(String basePath) {
- this.basePath = basePath;
- }
+ public String getHostName() {
+ return hostName;
+ }
- public String getHostName() {
- return hostName;
- }
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
+ public String getBasePath() {
+ return basePath;
}
- public String getResourceId() {
- return context.hostName+ ":" + resourcePath + ":" + resourceType;
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
}
+ public Type getEventType() {
+ return eventType;
+ }
+ public void setEventType(Type eventType) {
+ this.eventType = eventType;
+ }
}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index f19d272..d8d834a 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.messaging.model;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
@@ -19,18 +18,14 @@ public class NotificationEventDeserializer implements Deserializer<NotificationE
String deserialized = new String(bytes);
String parts[] = deserialized.split(",");
NotificationEvent event = new NotificationEvent();
- NotificationEvent.Context context = new NotificationEvent.Context();
- event.setId(parts[0]);
- context.setEvent(MessagingEvents.valueOf(parts[1]));
- context.setOccuredTime(Long.valueOf(parts[2]));
- context.setAuthToken(String.valueOf(parts[3]));
- context.setTenantId(String.valueOf(parts[4]));
- context.setHostName(parts[5]);
- context.setBasePath(parts[6]);
- event.setResourcePath(parts[7]);
- event.setResourceType(parts[8]);
- event.setResourceName(parts[9]);
- event.setContext(context);
+ event.setResourcePath(parts[0]);
+ event.setResourceType(parts[1]);
+ event.setOccuredTime(Long.valueOf(parts[2]));
+ event.setTenantId(parts[3]);
+ event.setHostName(parts[4]);
+ event.setBasePath(parts[5]);
+ event.setEventType(NotificationEvent.Type.valueOf(parts[6]));
+ event.setAuthToken(parts[7]);
return event;
}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 7a55770..c92d181 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -15,16 +15,14 @@ public class NotificationEventSerializer implements Serializer<NotificationEvent
@Override
public byte[] serialize(String s, NotificationEvent notificationEvent) {
- String serializedData = notificationEvent.getId() + "," +
- notificationEvent.getContext().getEvent().name() + "," +
- notificationEvent.getContext().getOccuredTime() + "," +
- notificationEvent.getContext().getAuthToken() + "," +
- notificationEvent.getContext().getTenantId() + "," +
- notificationEvent.getContext().getHostName() + "," +
- notificationEvent.getContext().getBasePath() + "," +
- notificationEvent.getResourcePath() + "," +
+ String serializedData = notificationEvent.getResourcePath() + "," +
notificationEvent.getResourceType() + "," +
- notificationEvent.getResourceName();
+ notificationEvent.getOccuredTime() + "," +
+ notificationEvent.getTenantId() + "," +
+ notificationEvent.getHostName() + "," +
+ notificationEvent.getBasePath() + "," +
+ notificationEvent.getEventType() + "," +
+ notificationEvent.getAuthToken();
return serializedData.getBytes();
}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
index e4aa253..4abded7 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
@@ -30,7 +30,7 @@ public class MessageProducer {
public void publish(String topic, NotificationEvent notificationMessage, Callback callback) throws ExecutionException, InterruptedException {
try {
final ProducerRecord<String, NotificationEvent> record = new ProducerRecord<>(topic,
- notificationMessage.getId(),
+ notificationMessage.getResourcePath(),
notificationMessage);
producer.send(record, callback).get();
} finally {
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
deleted file mode 100644
index d1843fd..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-import java.util.Set;
-
-/**
- * DataOrchestrator entity
- */
-@Entity
-@Table(name = "DATAORCHESTRATOR_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class DataOrchestratorEntity {
-
- @Id
- private String id;
-
- @Column(nullable = false)
- private String resourceId;
-
- @Column(nullable = false)
- private String resourcePath;
-
- @Column(nullable = false)
- private String resourceName;
-
- @Column(nullable = false)
- private String resourceType;
-
- @Column(nullable = false)
- private Date occurredTime;
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @CreatedDate
- private Date createdAt;
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @LastModifiedDate
- private Date lastModifiedAt;
-
- @Column(nullable = false)
- private String eventStatus;
-
- @Column(nullable = false)
- private String eventType;
-
- @Column(nullable = false)
- private String tenantId;
-
- @Column(nullable = false)
- private String agentId;
-
- @Column(nullable = false)
- private String authToken;
-
- @Column(nullable = false)
- private String hostName;
-
- @Lob
- private String error;
-
- @OneToMany(fetch = FetchType.EAGER, mappedBy = "dataOrchestratorEntity", orphanRemoval = true, cascade = CascadeType.ALL)
- private Set<WorkflowEntity> workFlowEntities;
-
- @OneToMany(fetch = FetchType.EAGER, mappedBy = "dataOrchestratorEntity", orphanRemoval = true, cascade = CascadeType.ALL)
- private Set<OwnershipEntity> ownershipEntities;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getResourceId() {
- return resourceId;
- }
-
- public void setResourceId(String resourceId) {
- this.resourceId = resourceId;
- }
-
- public String getResourcePath() {
- return resourcePath;
- }
-
- public void setResourcePath(String resourcePath) {
- this.resourcePath = resourcePath;
- }
-
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
- public String getResourceType() {
- return resourceType;
- }
-
- public void setResourceType(String resourceType) {
- this.resourceType = resourceType;
- }
-
- public Date getOccurredTime() {
- return occurredTime;
- }
-
- public void setOccurredTime(Date occurredTime) {
- this.occurredTime = occurredTime;
- }
-
- public Date getCreatedAt() {
- return createdAt;
- }
-
- public void setCreatedAt(Date createdAt) {
- this.createdAt = createdAt;
- }
-
- public Date getLastModifiedAt() {
- return lastModifiedAt;
- }
-
- public void setLastModifiedAt(Date lastModifiedAt) {
- this.lastModifiedAt = lastModifiedAt;
- }
-
- public String getEventStatus() {
- return eventStatus;
- }
-
- public void setEventStatus(String status) {
- this.eventStatus = status;
- }
-
- public Set<WorkflowEntity> getWorkFlowEntities() {
- return workFlowEntities;
- }
-
- public void setWorkFlowEntities(Set<WorkflowEntity> workFlowEntities) {
- this.workFlowEntities = workFlowEntities;
- }
-
- public String getError() {
- return error;
- }
-
- public void setError(String error) {
- this.error = error;
- }
-
- public String getEventType() {
- return eventType;
- }
-
- public void setEventType(String eventType) {
- this.eventType = eventType;
- }
-
- public String getTenantId() {
- return tenantId;
- }
-
- public void setTenantId(String tenantId) {
- this.tenantId = tenantId;
- }
-
- public String getAgentId() {
- return agentId;
- }
-
- public void setAgentId(String agentId) {
- this.agentId = agentId;
- }
-
- public String getAuthToken() {
- return authToken;
- }
-
- public void setAuthToken(String authToken) {
- this.authToken = authToken;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public Set<OwnershipEntity> getOwnershipEntities() {
- return ownershipEntities;
- }
-
- public void setOwnershipEntities(Set<OwnershipEntity> ownershipEntities) {
- this.ownershipEntities = ownershipEntities;
- }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
deleted file mode 100644
index ec6e4eb..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-
-@Entity
-@Table(name = "OWNERSHIP_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class OwnershipEntity {
-
- @Id
- private String id;
-
- @Column(name = "USER_ID")
- private String userId;
-
- @Column(name = "PERMISSION_ID")
- private String permissionId;
-
- @ManyToOne
- @JoinColumn(name = "DATA_ORCHESTRATOR_ENTITY_ID")
- private DataOrchestratorEntity dataOrchestratorEntity;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public String getPermissionId() {
- return permissionId;
- }
-
- public void setPermissionId(String permissionId) {
- this.permissionId = permissionId;
- }
-
- public DataOrchestratorEntity getDataOrchestratorEntity() {
- return dataOrchestratorEntity;
- }
-
- public void setDataOrchestratorEntity(DataOrchestratorEntity dataOrchestratorEntity) {
- this.dataOrchestratorEntity = dataOrchestratorEntity;
- }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
deleted file mode 100644
index d585020..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-import java.util.Set;
-
-/**
- * An workflow class that represents the workflow entity
- */
-@Entity
-@Table(name = "WORKFLOW_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class WorkflowEntity {
-
- @Id
- private String id;
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @CreatedDate
- private Date createdAt;
-
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @LastModifiedDate
- private Date lastModifiedAt;
-
- @Column(nullable = false)
- private String status;
-
-
- @ManyToOne
- @JoinColumn(name = "dataorchestrator_entity_id")
- private DataOrchestratorEntity dataOrchestratorEntity;
-
- @OneToMany(fetch = FetchType.EAGER, mappedBy = "workflowEntity", orphanRemoval = true, cascade = CascadeType.ALL)
- private Set<WorkflowTaskEntity> workflowTaskEntities;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public Date getCreatedAt() {
- return createdAt;
- }
-
- public void setCreatedAt(Date createdAt) {
- this.createdAt = createdAt;
- }
-
- public Date getLastModifiedAt() {
- return lastModifiedAt;
- }
-
- public void setLastModifiedAt(Date lastModifiedAt) {
- this.lastModifiedAt = lastModifiedAt;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public DataOrchestratorEntity getDataOrchestratorEntity() {
- return dataOrchestratorEntity;
- }
-
- public void setDataOrchestratorEntity(DataOrchestratorEntity dataOrchestratorEntity) {
- this.dataOrchestratorEntity = dataOrchestratorEntity;
- }
-
- public Set<WorkflowTaskEntity> getWorkflowTaskEntities() {
- return workflowTaskEntities;
- }
-
- public void setWorkflowTaskEntities(Set<WorkflowTaskEntity> workflowTaskEntities) {
- this.workflowTaskEntities = workflowTaskEntities;
- }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
deleted file mode 100644
index b20ddd8..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.WorkflowEntity;
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-
-/**
- * An entity class represents the task entity
- */
-@Entity
-@Table(name = "WORKFLOW_TASK_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class WorkflowTaskEntity {
-
- @Id
- private String id;
-
- @ManyToOne
- @JoinColumn(name = "workflow_entity_id")
- private WorkflowEntity workflowEntity;
-
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @CreatedDate
- private Date createdAt;
-
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @LastModifiedDate
- private Date lastModifiedAt;
-
- @Column(nullable = false)
- private String status;
-
- @Lob
- private String error;
-
- private int errorCode;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public WorkflowEntity getWorkflowEntity() {
- return workflowEntity;
- }
-
- public void setWorkflowEntity(WorkflowEntity workFlowEntity) {
- this.workflowEntity = workFlowEntity;
- }
-
- public Date getCreatedAt() {
- return createdAt;
- }
-
- public void setCreatedAt(Date createdAt) {
- this.createdAt = createdAt;
- }
-
- public Date getLastModifiedAt() {
- return lastModifiedAt;
- }
-
- public void setLastModifiedAt(Date lastModifiedAt) {
- this.lastModifiedAt = lastModifiedAt;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public String getError() {
- return error;
- }
-
- public void setError(String error) {
- this.error = error;
- }
-
- public int getErrorCode() {
- return errorCode;
- }
-
- public void setErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
index 27aa4fd..b37c123 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
@@ -36,6 +36,9 @@ public class DataParsingJobOutputEntity {
@Column(name = "OUTPUT_TYPE")
private DataParsingJobOutputType outputType;
+ @Column(name = "OUTPUT_DIR_RESOURCE_ID")
+ private String outputDirectoryResourceId;
+
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "DATA_PARSER_OUTPUT_INTERFACE_ID", insertable=false, updatable=false)
private DataParserOutputInterfaceEntity dataParserOutputInterface;
@@ -91,4 +94,12 @@ public class DataParsingJobOutputEntity {
public void setDataParsingJobEntity(DataParsingJobEntity dataParsingJobEntity) {
this.dataParsingJobEntity = dataParsingJobEntity;
}
+
+ public String getOutputDirectoryResourceId() {
+ return outputDirectoryResourceId;
+ }
+
+ public void setOutputDirectoryResourceId(String outputDirectoryResourceId) {
+ this.outputDirectoryResourceId = outputDirectoryResourceId;
+ }
}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
deleted file mode 100644
index 43f2eb9..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.data.jpa.repository.Query;
-
-import java.util.List;
-
-public interface DataOrchestratorEventRepository extends JpaRepository<DataOrchestratorEntity, String> {
-
- @Query(value = "select * from DATAORCHESTRATOR_ENTITY s where s.eventStatus = ?1 ORDER BY occurredTime DESC", nativeQuery = true)
- public List<DataOrchestratorEntity> findAllEntitiesWithGivenStatus(String eventStatus);
-
-
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
deleted file mode 100644
index 4014186..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.WorkflowEntity;
-import org.springframework.data.jpa.repository.JpaRepository;
-
-public interface WorkflowEntityRepository extends JpaRepository<WorkflowEntity, String> {
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
index d728ef1..52494e5 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
@@ -153,6 +153,8 @@ public class Configuration {
private String drmsHost;
private int drmsPort;
private int pollingInterval;
+ private String mftHost;
+ private int mftPort;
public OutboundEventProcessorConfig() {
@@ -198,6 +200,22 @@ public class Configuration {
public void setPollingInterval(int pollingInterval) {
this.pollingInterval = pollingInterval;
}
+
+ public String getMftHost() {
+ return mftHost;
+ }
+
+ public void setMftHost(String mftHost) {
+ this.mftHost = mftHost;
+ }
+
+ public int getMftPort() {
+ return mftPort;
+ }
+
+ public void setMftPort(int mftPort) {
+ this.mftPort = mftPort;
+ }
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index e24eb25..96fac88 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -11,10 +11,6 @@ import org.apache.airavata.datalake.drms.sharing.ShareEntityWithUserRequest;
import org.apache.airavata.datalake.drms.storage.*;
import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,50 +56,36 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
return !this.drmsChannel.isShutdown();
}
- public void shareWithUser(DataOrchestratorEntity entity) throws Exception {
-
- Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
+ public void shareWithUser(String authToken, String tenantId, String admin, String user, String resourceId, String permission) throws Exception {
DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
+ .setAccessToken(authToken)
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
.setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(adminOp.get().getUserId())
- .setTenantId(entity.getTenantId())
+ .setUsername(admin)
+ .setTenantId(tenantId)
.build())
.build();
- for (OwnershipEntity ownershipEntity : entity.getOwnershipEntities()) {
- if (ownershipEntity.getPermissionId().equals("ADMIN")) {
- continue;
- }
- ShareEntityWithUserRequest.Builder shareBuilder = ShareEntityWithUserRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .setEntityId(entity.getResourceId())
- .setSharedUserId(ownershipEntity.getUserId())
- .setPermissionId(ownershipEntity.getPermissionId());
-
- this.sharingServiceBlockingStub.shareEntityWithUser(shareBuilder.build());
- }
+ ShareEntityWithUserRequest.Builder shareBuilder = ShareEntityWithUserRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .setEntityId(resourceId)
+ .setSharedUserId(user)
+ .setPermissionId(permission);
- }
+ this.sharingServiceBlockingStub.shareEntityWithUser(shareBuilder.build());
- public Optional<TransferMapping> getActiveTransferMapping(DataOrchestratorEntity entity, String hostname) throws Exception {
+ }
- Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
+ public Optional<TransferMapping> getActiveTransferMapping(String authToken, String tenantId,
+ String user, String hostName) throws Exception {
DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
+ .setAccessToken(authToken)
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
.setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(adminOp.get().getUserId())
- .setTenantId(entity.getTenantId())
+ .setUsername(user)
+ .setTenantId(tenantId)
.build())
.build();
FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
@@ -116,7 +98,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
transferMappingList.forEach(transferMapping -> {
if (transferMapping.getSourceStorage().getStorageCase()
.equals(AnyStorage.StorageCase.SSH_STORAGE)) {
- if (transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname)) {
+ if (transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostName)) {
transferMappingOp.set(transferMapping);
}
}
@@ -126,24 +108,22 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
}
- public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
+ public Optional<GenericResource> createResource(String authToken,
+ String tenantId,
String resourceId,
String resourceName,
String resourcePath,
String parentId,
- String type, String parentType) throws Exception {
+ String type, String parentType,
+ String user) throws Exception {
- Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
+ .setAccessToken(authToken)
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
.setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(adminOp.get().getUserId())
- .setTenantId(entity.getTenantId())
+ .setUsername(user)
+ .setTenantId(tenantId)
.build())
.build();
@@ -165,10 +145,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
return Optional.ofNullable(resourceCreateResponse.getResource());
} catch (Exception ex) {
- LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
- repository.save(entity);
+ LOGGER.error("Error occurred while creating resource {} in DRMS", resourcePath, ex);
return Optional.empty();
}
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
index b95ccb3..d571568 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -10,6 +10,8 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowService
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* Drms connector to call DRMS services
*/
@@ -43,11 +45,11 @@ public class WorkflowServiceConnector implements AbstractConnector<Configuration
return false;
}
- public void invokeWorkflow(String authToken, String username, String tenantId, String sourceResourceId, String sourceCredentialToken,
+ public void invokeWorkflow(String authToken, String username, String tenantId, List<String> sourceResourceIds, String sourceCredentialToken,
String dstResourceId, String destinationCredentialToken) {
try {
WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
- .setSourceResourceId(sourceResourceId)
+ .addAllSourceResourceIds(sourceResourceIds)
.setDestinationResourceId(dstResourceId)
.setUsername(username)
.setTenantId(tenantId)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index ff5fa67..d6d25bc 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -18,9 +18,6 @@
package org.apache.airavata.datalake.orchestrator.handlers.async;
import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.processor.InboundEventProcessor;
-import org.apache.airavata.datalake.orchestrator.processor.OutboundEventProcessor;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +27,6 @@ import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
/**
* Orchestrator event handler
@@ -45,10 +41,6 @@ public class OrchestratorEventHandler {
private ScheduledExecutorService ouboundExecutorService;
private MessageConsumer messageConsumer;
- @Autowired
- private DataOrchestratorEventRepository dataOrchestratorEventRepository;
-
-
public OrchestratorEventHandler() {
}
@@ -60,20 +52,18 @@ public class OrchestratorEventHandler {
configuration.getConsumer().getConsumerGroup(),
configuration.getConsumer().getMaxPollRecordsConfig(),
configuration.getConsumer().getTopic());
-
}
public void startProcessing() throws Exception {
messageConsumer.consume((notificationEvent -> {
- LOGGER.info("Message received " + notificationEvent.getResourceName());
- LOGGER.info("Submitting {} to process in thread pool", notificationEvent.getId());
- this.executorService.submit(new InboundEventProcessor(configuration, notificationEvent, dataOrchestratorEventRepository));
-
+ LOGGER.info("Message received for resource path {}", notificationEvent.getResourcePath());
+ try {
+ this.executorService.submit(new OrchestratorEventProcessor(configuration, notificationEvent));
+ } catch (Exception e) {
+ LOGGER.error("Failed tu submit data orchestrator event to process on path {}",
+ notificationEvent.getResourcePath(), e);
+ }
}));
-
- this.ouboundExecutorService
- .scheduleAtFixedRate(new OutboundEventProcessor(configuration, dataOrchestratorEventRepository),
- 0, configuration.getOutboundEventProcessor().getPollingInterval(), TimeUnit.SECONDS);
}
public Configuration getConfiguration() {
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
new file mode 100644
index 0000000..6fc7d45
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.handlers.async;
+
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
+import org.apache.airavata.datalake.drms.storage.TransferMapping;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
+import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
+import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
+import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
+import org.apache.airavata.mft.api.service.FileMetadataResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class OrchestratorEventProcessor implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(OrchestratorEventProcessor.class);
+
+ private NotificationEvent notificationEvent;
+
+ private DRMSConnector drmsConnector;
+ private Configuration configuration;
+ private WorkflowServiceConnector workflowServiceConnector;
+
+ public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent) throws Exception {
+ this.notificationEvent = notificationEvent;
+ this.drmsConnector = new DRMSConnector(configuration);
+ this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
+ this.configuration = configuration;
+ }
+
+ private List<GenericResource> createResourceRecursively(String storageId, String basePath, String resourcePath, String resourceType, String user)
+ throws Exception{
+
+ List<GenericResource> resourceList = new ArrayList<>();
+
+ String parentType = "Storage";
+
+ String[] splitted = resourcePath.substring(basePath.length()).split("/");
+
+ String currentPath = basePath.endsWith("/")? basePath.substring(0, basePath.length() -1): basePath;
+ String parentId = storageId;
+ for (int i = 0; i < splitted.length - 1; i++) {
+ String resourceName = splitted[i];
+ currentPath = currentPath + "/" + resourceName;
+ String resourceId = Utils.getId(storageId + ":" + currentPath);
+ Optional<GenericResource> optionalGenericResource =
+ this.drmsConnector.createResource(notificationEvent.getAuthToken(),
+ notificationEvent.getTenantId(),
+ resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
+ if (optionalGenericResource.isPresent()) {
+ parentId = optionalGenericResource.get().getResourceId();
+ parentType = "COLLECTION";
+ resourceList.add(optionalGenericResource.get());
+ } else {
+ logger.error("Could not create a resource for path {}", currentPath);
+ throw new Exception("Could not create a resource for path " + currentPath);
+ }
+ }
+
+ currentPath = currentPath + "/" + splitted[splitted.length -1];
+
+ Optional<GenericResource> optionalGenericResource =
+ this.drmsConnector.createResource(notificationEvent.getAuthToken(),
+ notificationEvent.getTenantId(),
+ Utils.getId(storageId + ":" + currentPath),
+ splitted[splitted.length -1], currentPath,
+ parentId, resourceType, parentType, user);
+
+ if (optionalGenericResource.isPresent()) {
+ resourceList.add(optionalGenericResource.get());
+ } else {
+ logger.error("Could not create a resource for path {}", currentPath);
+ throw new Exception("Could not create a resource for path " + currentPath);
+ }
+
+ return resourceList;
+ }
+
+
+ private void shareResources(List<GenericResource> resourceList, String admin, String user, String permission) throws Exception {
+ for (GenericResource resource : resourceList) {
+ logger.info("Sharing resource {} with path {} with user {}",
+ resource.getResourceId(), resource.getResourcePath(), user);
+ this.drmsConnector.shareWithUser(notificationEvent.getAuthToken(), notificationEvent.getTenantId(), admin, user, resource.getResourceId(), permission);
+ }
+ }
+
+ @Override
+ public void run() {
+ logger.info("Processing resource path {} on storage {}", notificationEvent.getResourcePath(),
+ notificationEvent.getBasePath());
+
+ try {
+
+ if (!"FOLDER".equals(notificationEvent.getResourceType())) {
+ logger.error("Resource {} should be a Folder type but got {}",
+ notificationEvent.getResourcePath(),
+ notificationEvent.getResourceType());
+ logger.error("Resource should be a Folder type");
+ }
+ String removeBasePath = notificationEvent.getResourcePath().substring(notificationEvent.getBasePath().length());
+ String[] splitted = removeBasePath.split("/");
+
+ String adminUser = splitted[0];
+ String owner = splitted[1].split("_")[0];
+
+ Map<String, String> ownerRules = new HashMap<>();
+ ownerRules.put(adminUser, "ADMIN");
+ ownerRules.put(splitted[1], "OWNER");
+
+ Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
+ notificationEvent.getAuthToken(),
+ notificationEvent.getTenantId(), adminUser,
+ notificationEvent.getHostName());
+
+ if (optionalTransferMapping.isEmpty()) {
+ logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notificationEvent.getHostName());
+ throw new Exception("Could not find a transfer mapping");
+ }
+
+ TransferMapping transferMapping = optionalTransferMapping.get();
+
+ String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
+ String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+
+ // Creating parent resource
+
+ List<GenericResource> resourceList = createResourceRecursively(sourceStorageId,
+ notificationEvent.getBasePath(),
+ notificationEvent.getResourcePath(),
+ "COLLECTION", adminUser);
+
+ shareResources(Collections.singletonList(resourceList.get(resourceList.size() -1)), adminUser, owner, "ADMIN");
+
+ GenericResource resourceObj = resourceList.get(resourceList.size() -1);
+
+ Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
+ notificationEvent.getAuthToken(), adminUser,
+ notificationEvent.getTenantId(), sourceStorageId);
+
+ if (sourceSPOp.isEmpty()) {
+ logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
+ throw new Exception("No storage preference found for source storage");
+ }
+
+ Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
+ notificationEvent.getAuthToken(), adminUser,
+ notificationEvent.getTenantId(), destinationStorageId);
+
+ if (destSPOp.isEmpty()) {
+ logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
+ throw new Exception("No storage preference found for destination storage");
+ }
+
+ AnyStoragePreference sourceSP = sourceSPOp.get();
+ AnyStoragePreference destSP = destSPOp.get();
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(
+ this.configuration.getOutboundEventProcessor().getMftHost(),
+ this.configuration.getOutboundEventProcessor().getMftPort());
+
+ String decodedAuth = new String(Base64.getDecoder().decode(notificationEvent.getAuthToken()));
+ String[] authParts = decodedAuth.split(":");
+
+ if (authParts.length != 2) {
+ throw new Exception("Could not decode auth token to work with MFT");
+ }
+
+ DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+ .setUserId(adminUser)
+ .setClientId(authParts[0])
+ .setClientSecret(authParts[1])
+ .putProperties("TENANT_ID", notificationEvent.getTenantId()).build();
+
+ AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
+
+ FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
+ .setMftAuthorizationToken(mftAuth)
+ .setResourceId(resourceObj.getResourceId());
+
+ switch (sourceSP.getStorageCase()){
+ case SSH_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("SCP");
+ resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getCredentialToken());
+ break;
+ case S3_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("S3");
+ resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getCredentialToken());
+ break;
+ }
+
+ // Fetching file list for parent resource
+
+ DirectoryMetadataResponse directoryResourceMetadata = mftClient.getDirectoryResourceMetadata(resourceMetadataReq.build());
+
+ List<String> resourceIDsToProcess = new ArrayList<>();
+ for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
+ logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
+ resourceList = createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+ fileMetadata.getResourcePath(), "FILE", adminUser);
+ GenericResource fileResource = resourceList.get(resourceList.size() -1);
+
+ resourceIDsToProcess.add(fileResource.getResourceId());
+ }
+
+ for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
+ logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
+ createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+ directoryMetadata.getResourcePath(),
+ "COLLECTION", adminUser);
+ // TODO scan directories
+ }
+
+ logger.info("Creating destination zip resource for directory {}", notificationEvent.getResourcePath());
+ resourceList = createResourceRecursively(destinationStorageId, notificationEvent.getBasePath(),
+ notificationEvent.getResourcePath(), "FILE", adminUser);
+
+ GenericResource destinationResource = resourceList.get(resourceList.size() -1);
+
+ System.out.println(destinationResource);
+
+ logger.info("Submitting resources to workflow manager");
+ this.workflowServiceConnector.invokeWorkflow(notificationEvent.getAuthToken(), adminUser,
+ notificationEvent.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getCredentialToken(),
+ destinationResource.getResourceId(), destSP.getSshStoragePreference().getCredentialToken());
+
+
+ logger.info("Completed processing path {}", notificationEvent.getResourcePath());
+
+ } catch (Exception e) {
+ logger.error("Failed to process event for resource path {}", notificationEvent.getResourcePath(), e);
+ }
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
deleted file mode 100644
index 62bd6ff..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.processor;
-
-import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.Utils;
-import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.dozer.DozerBeanMapper;
-import org.dozer.loader.api.BeanMappingBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * This class is responsible for pick events from kafka queue and publish them into inmemory store
- */
-public class InboundEventProcessor implements MessageProcessor<Configuration> {
- private static final Logger LOGGER = LoggerFactory.getLogger(InboundEventProcessor.class);
- private Configuration configuration;
- private NotificationEvent notificationEvent;
- private DozerBeanMapper dozerBeanMapper;
-
- private DataOrchestratorEventRepository repository;
-
- public InboundEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
- DataOrchestratorEventRepository repository) throws Exception {
- this.notificationEvent = notificationEvent;
- this.repository = repository;
- this.init(configuration);
- }
-
- @Override
- public void init(Configuration configuration) throws Exception {
- try {
- this.configuration = configuration;
- dozerBeanMapper = new DozerBeanMapper();
- BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
- @Override
- protected void configure() {
- mapping(NotificationEvent.class, DataOrchestratorEntity.class);
- }
- };
- dozerBeanMapper.addMapping(orchestratorEventMapper);
- } catch (Exception exception) {
- LOGGER.error(" Error occurred while initiating Inbound event processor ", exception);
- throw exception;
- }
-
- }
-
- @Override
- public void close() throws Exception {
-
- }
-
- @Override
- public void run() {
- try {
- LOGGER.info("Inbound event processor received event " + notificationEvent.getResourceId());
- String typeStr = this.configuration.getMessageFilter().getResourceType();
- String[] allowedTypes = typeStr.split(",");
- boolean proceed = false;
- long size = Arrays.stream(allowedTypes).filter(type ->
- type.equals(notificationEvent.getResourceType())).count();
- if (size == 0) {
- return;
- }
- String eventTypeStr = this.configuration.getMessageFilter().getEventType();
- String[] eventTypes = eventTypeStr.split(",");
- long eventSize = Arrays.stream(eventTypes).filter(type ->
- type.trim().equals(notificationEvent.getContext().getEvent().name())).count();
- if (eventSize == 0) {
- return;
- }
-
- String pattern = this.configuration.getMessageFilter().getResourceNameExclusions();
-
- // Create a Pattern object
- Pattern r = Pattern.compile(pattern);
-
- // Now create matcher object.
- Matcher m = r.matcher(notificationEvent.getResourceName());
-
- if (m.find()) {
- return;
- }
-
- DataOrchestratorEntity entity = createEntity(notificationEvent);
- repository.save(entity);
- } catch (Exception exception) {
- LOGGER.error("Error occurred while pre processing event {}", this.notificationEvent.getResourceId(), exception);
- }
-
- }
-
- private DataOrchestratorEntity createEntity(NotificationEvent event) throws NoSuchAlgorithmException {
- DataOrchestratorEntity entity = dozerBeanMapper.map(event, DataOrchestratorEntity.class);
- entity.setOccurredTime(new Date(event.getContext().getOccuredTime()));
- entity.setEventStatus(EventStatus.DATA_ORCH_RECEIVED.name());
- entity.setEventType(event.getContext().getEvent().name());
- entity.setAuthToken(event.getContext().getAuthToken());
- entity.setHostName(event.getContext().getHostName());
-
- String resourcePath = event.getResourcePath();
- String basePath = event.getContext().getBasePath();
- String removeBasePath = resourcePath.substring(basePath.length());
- String[] splitted = removeBasePath.split("/");
-
- OwnershipEntity owner1 = new OwnershipEntity();
- owner1.setId(UUID.randomUUID().toString());
- owner1.setUserId(splitted[1]);
- owner1.setPermissionId("OWNER");
- owner1.setDataOrchestratorEntity(entity);
-
- OwnershipEntity owner2 = new OwnershipEntity();
- owner2.setId(UUID.randomUUID().toString());
- owner2.setUserId(splitted[0]);
- owner2.setPermissionId("ADMIN");
- owner2.setDataOrchestratorEntity(entity);
-
-
- Set<OwnershipEntity> owners = new HashSet<>();
- owners.add(owner1);
- owners.add(owner2);
-
- entity.setOwnershipEntities(owners);
-
- entity.setTenantId(event.getContext().getTenantId());
-
- String authDecoded = new String(Base64.getDecoder()
- .decode(event.getContext().getAuthToken().getBytes(StandardCharsets.UTF_8)));
- String agentId = authDecoded.split(":")[0];
- entity.setAgentId(agentId);
- entity.setResourceId(Utils.getId(event.getResourceId()));
- return entity;
- }
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
deleted file mode 100644
index 4b04005..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ /dev/null
@@ -1,197 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.processor;
-
-import org.apache.airavata.datalake.drms.resource.GenericResource;
-import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
-import org.apache.airavata.datalake.drms.storage.TransferMapping;
-import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.Utils;
-import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
-import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
-import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.dozer.DozerBeanMapper;
-import org.dozer.loader.api.BeanMappingBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * This class is responsible and publish events to registry and
- * Workflow engine
- */
-public class OutboundEventProcessor implements MessageProcessor<Configuration> {
- private static final Logger LOGGER = LoggerFactory.getLogger(OutboundEventProcessor.class);
-
- private DozerBeanMapper dozerBeanMapper;
- private DataOrchestratorEventRepository repository;
-
- private DRMSConnector drmsConnector;
- private WorkflowServiceConnector workflowServiceConnector;
-
- public OutboundEventProcessor(Configuration configuration, DataOrchestratorEventRepository repository) throws Exception {
- this.repository = repository;
- this.init(configuration);
- }
-
- @Override
- public void init(Configuration configuration) throws Exception {
- this.drmsConnector = new DRMSConnector(configuration);
- this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
- dozerBeanMapper = new DozerBeanMapper();
- BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
- @Override
- protected void configure() {
- mapping(NotificationEvent.class, DataOrchestratorEntity.class);
- }
- };
- dozerBeanMapper.addMapping(orchestratorEventMapper);
-
- }
-
- @Override
- public void close() throws Exception {
- this.drmsConnector.close();
- this.workflowServiceConnector.close();
- }
-
-
- @Override
- public void run() {
-
- try {
- List<DataOrchestratorEntity> orchestratorEntityList = this.repository
- .findAllEntitiesWithGivenStatus(EventStatus.DATA_ORCH_RECEIVED.name());
- Map<String, List<DataOrchestratorEntity>> entityMap = new HashMap<>();
- orchestratorEntityList.forEach(entity -> {
- entityMap.computeIfAbsent(entity.getResourceId(), list -> new ArrayList()).add(entity);
- });
- entityMap.forEach((key, value) -> {
- try {
- DataOrchestratorEntity entity = value.remove(0);
- processEvent(entity);
- value.forEach(val -> {
- val.setEventStatus(EventStatus.DATA_ORCH_PROCESSED_AND_SKIPPED.name());
- repository.save(val);
- });
- } catch (Exception e) {
- LOGGER.error("Errored while processing event", e);
- }
- });
- } catch (Exception ex) {
- LOGGER.error("Error while processing events", ex);
- }
-
- }
-
- private void processEvent(DataOrchestratorEntity entity) {
- try {
-
- // TODO move this logic to file listener as this is EMC specific
- Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
-
- String resourcePath = entity.getResourcePath();
- String tail = resourcePath.substring(resourcePath.indexOf(adminOp.get().getUserId()));
-
- String[] collections = tail.split("/");
-
- Optional<TransferMapping> optionalStorPref = drmsConnector.getActiveTransferMapping(entity, entity.getHostName());
- if (optionalStorPref.isEmpty()) {
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Storage not found for host: " + entity.getHostName());
- repository.save(entity);
- return;
- }
-
- TransferMapping transferMapping = optionalStorPref.get();
- String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
- String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
- String parentType = "Storage";
-
- String parentId = sourceStorageId;
- for (int i = 0; i < collections.length - 1; i++) {
- String resourceName = collections[i];
- String path = entity.getResourcePath().substring(0, entity.getResourcePath().indexOf(resourceName));
- path = path.concat(resourceName);
- String entityId = Utils.getId(path);
- Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, parentId, "COLLECTION", parentType);
- if (optionalGenericResource.isPresent()) {
- parentId = optionalGenericResource.get().getResourceId();
- parentType = "COLLECTION";
- } else {
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Collection structure creation failed: " + entity.getHostName());
- repository.save(entity);
- return;
- }
- }
-
- Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(repository, entity, entity.getResourceId(),
- collections[collections.length - 1], entity.getResourcePath(),
- parentId, "FILE", parentType);
-
- String destinationResourceId = destinationStorageId + ":" + entity.getResourcePath() + ":" + entity.getResourceType();
- String messageId = Utils.getId(destinationResourceId);
-
- Optional<GenericResource> destinationFile = this.drmsConnector.createResource(repository, entity, messageId,
- entity.getResourceName(),
- entity.getResourcePath(),
- destinationStorageId,
- "FILE", "Storage");
-
- if (optionalGenericResource.isPresent() && destinationFile.isPresent()) {
- try {
-
- Optional<AnyStoragePreference> storagePreferenceOptional = this.drmsConnector
- .getStoragePreference(entity.getAuthToken(), adminOp.get().getUserId(), entity.getTenantId(), sourceStorageId);
-
- Optional<AnyStoragePreference> destinationPreferenceOptional = this.drmsConnector
- .getStoragePreference(entity.getAuthToken(), adminOp.get().getUserId(), entity.getTenantId(), destinationStorageId);
- if (storagePreferenceOptional.isPresent() && destinationPreferenceOptional.isPresent()) {
- String sourceCredentialToken = storagePreferenceOptional.get()
- .getSshStoragePreference()
- .getCredentialToken();
- String destinationCredentialToken = storagePreferenceOptional.get()
- .getSshStoragePreference()
- .getCredentialToken();
-
- this.workflowServiceConnector.invokeWorkflow(entity.getAuthToken(), adminOp.get().getUserId(),
- entity.getTenantId(), entity.getResourceId(), sourceCredentialToken,
- messageId, destinationCredentialToken);
- entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
- repository.save(entity);
- } else {
- String msg = "Cannot find storage preference for storage " + sourceStorageId + " for user " + adminOp.get().getUserId();
- entity.setError(msg);
- entity.setEventStatus(EventStatus.ERRORED.name());
- repository.save(entity);
- LOGGER.error(msg);
- }
-
-
- } catch (Exception exception) {
- String msg = "Error occurred while invoking workflow manager" + exception.getMessage();
- entity.setError("Error occurred while invoking workflow manager " + exception.getMessage());
- entity.setEventStatus(EventStatus.ERRORED.name());
- repository.save(entity);
- LOGGER.error(msg, exception);
- }
- }
- } catch (Exception exception) {
- LOGGER.error("Error occurred while processing outbound data orchestrator event", exception);
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while processing ");
- repository.save(entity);
- }
- }
-
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
index df6edf7..ab077cc 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
@@ -60,6 +60,7 @@ message DataParsingJobOutput {
string dataParserOutputInterfaceId = 2;
string dataParsingJobId = 3;
string outputType = 4;
+ string outputDirectoryResourceId = 5;
}
message DataParsingJob {
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
index af77f3e..7482c7a 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -90,115 +90,118 @@ public class DataParsingWorkflowManager {
public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {
WorkflowMessage workflowMessage = request.getMessage();
- logger.info("Processing parsing workflow for resource {}", workflowMessage.getSourceResourceId());
-
- MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);
-
- DelegateAuth delegateAuth = DelegateAuth.newBuilder()
- .setUserId(workflowMessage.getUsername())
- .setClientId(mftClientId)
- .setClientSecret(mftClientSecret)
- .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
-
- FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
- .setResourceType("SCP")
- .setResourceId(workflowMessage.getSourceResourceId())
- .setResourceToken(workflowMessage.getSourceCredentialToken())
- .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
-
- ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
- DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
-
- ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
-
- Map<String, StringMap> parserInputMappings = new HashMap<>();
- List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
- List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
-
- boolean match = true;
- StringMap stringMap = new StringMap();
- for (DataParsingJobInput pji : pjis) {
-
- ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
- Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
- bindings.put("polyglot.js.allowHostAccess", true);
- bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
- bindings.put("metadata", metadata);
- try {
- Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
- stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
- match = match && eval;
- } catch (ScriptException e) {
- logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
- match = false;
+
+ for (String sourceResourceId : workflowMessage.getSourceResourceIdsList()) {
+ logger.info("Processing parsing workflow for resource {}", sourceResourceId);
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);
+
+ DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+ .setUserId(workflowMessage.getUsername())
+ .setClientId(mftClientId)
+ .setClientSecret(mftClientSecret)
+ .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
+
+ FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+ .setResourceType("SCP")
+ .setResourceId(sourceResourceId)
+ .setResourceToken(workflowMessage.getSourceCredentialToken())
+ .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+ DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
+
+ ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
+
+ Map<String, StringMap> parserInputMappings = new HashMap<>();
+ List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
+ List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
+
+ boolean match = true;
+ StringMap stringMap = new StringMap();
+ for (DataParsingJobInput pji : pjis) {
+
+ ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
+ Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
+ bindings.put("polyglot.js.allowHostAccess", true);
+ bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
+ bindings.put("metadata", metadata);
+ try {
+ Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
+ stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
+ match = match && eval;
+ } catch (ScriptException e) {
+ logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
+ match = false;
+ }
}
- }
- if (match) {
- parserInputMappings.put(pj.getParserId(), stringMap);
- }
- return match;
- }).collect(Collectors.toList());
-
- Map<String, AbstractTask> taskMap = new HashMap<>();
-
- SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
- downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
- downloadTask.setMftClientId(mftClientId);
- downloadTask.setMftClientSecret(mftClientSecret);
- downloadTask.setUserId(workflowMessage.getUsername());
- downloadTask.setTenantId(workflowMessage.getTenantId());
- downloadTask.setMftHost(mftHost);
- downloadTask.setMftPort(mftPort);
- downloadTask.setSourceResourceId(workflowMessage.getSourceResourceId());
- downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
-
- taskMap.put(downloadTask.getTaskId(), downloadTask);
-
- for(String parserId: parserInputMappings.keySet()) {
-
- GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
- dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
- dataParsingTask.setParserId(parserId);
- dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
- taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
-
- OutPort outPort = new OutPort();
- outPort.setNextTaskId(dataParsingTask.getTaskId());
- downloadTask.addOutPort(outPort);
-
- DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
- ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
-
- for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
-
- Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
- o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
- .findFirst();
-
- if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
- MetadataPersistTask mpt = new MetadataPersistTask();
- mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
- mpt.setDrmsHost(drmsHost);
- mpt.setDrmsPort(drmsPort);
- mpt.setTenant(workflowMessage.getTenantId());
- mpt.setUser(workflowMessage.getUsername());
- mpt.setServiceAccountKey(mftClientId);
- mpt.setServiceAccountSecret(mftClientSecret);
- mpt.setResourceId(workflowMessage.getSourceResourceId());
- mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
- OutPort dpOut = new OutPort();
- dpOut.setNextTaskId(mpt.getTaskId());
- dataParsingTask.addOutPort(dpOut);
- taskMap.put(mpt.getTaskId(), mpt);
+ if (match) {
+ parserInputMappings.put(pj.getParserId(), stringMap);
+ }
+ return match;
+ }).collect(Collectors.toList());
+
+ Map<String, AbstractTask> taskMap = new HashMap<>();
+
+ SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
+ downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
+ downloadTask.setMftClientId(mftClientId);
+ downloadTask.setMftClientSecret(mftClientSecret);
+ downloadTask.setUserId(workflowMessage.getUsername());
+ downloadTask.setTenantId(workflowMessage.getTenantId());
+ downloadTask.setMftHost(mftHost);
+ downloadTask.setMftPort(mftPort);
+ downloadTask.setSourceResourceId(sourceResourceId);
+ downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+
+ taskMap.put(downloadTask.getTaskId(), downloadTask);
+
+ for(String parserId: parserInputMappings.keySet()) {
+
+ GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
+ dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
+ dataParsingTask.setParserId(parserId);
+ dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
+ taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
+
+ OutPort outPort = new OutPort();
+ outPort.setNextTaskId(dataParsingTask.getTaskId());
+ downloadTask.addOutPort(outPort);
+
+ DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
+ ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
+
+ for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
+
+ Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
+ o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
+ .findFirst();
+
+ if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
+ MetadataPersistTask mpt = new MetadataPersistTask();
+ mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
+ mpt.setDrmsHost(drmsHost);
+ mpt.setDrmsPort(drmsPort);
+ mpt.setTenant(workflowMessage.getTenantId());
+ mpt.setUser(workflowMessage.getUsername());
+ mpt.setServiceAccountKey(mftClientId);
+ mpt.setServiceAccountSecret(mftClientSecret);
+ mpt.setResourceId(sourceResourceId);
+ mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
+ OutPort dpOut = new OutPort();
+ dpOut.setNextTaskId(mpt.getTaskId());
+ dataParsingTask.addOutPort(dpOut);
+ taskMap.put(mpt.getTaskId(), mpt);
+ }
}
- }
- }
+ }
- String[] startTaskIds = {downloadTask.getTaskId()};
- String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
+ String[] startTaskIds = {downloadTask.getTaskId()};
+ String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
- logger.info("Submitted workflow {} to parse resource {}", workflowId, workflowMessage.getSourceResourceId());
+ logger.info("Submitted workflow {} to parse resource {}", workflowId, sourceResourceId);
+ }
}
}
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
index d12a365..ff0ee59 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
@@ -90,8 +90,6 @@ public class DataSyncWorkflowManager {
@Value("${custos.secret}")
private String custosSecret;
-
-
@Value("${drms.host}")
private String drmsHost;
@@ -214,7 +212,7 @@ public class DataSyncWorkflowManager {
// dt1.setMftCallbackStoreHost(datasyncWmHost);
// dt1.setMftCallbackStorePort(datasyncWmPort);
- DataTransferPreValidationTask dt1 = new DataTransferPreValidationTask();
+ /*DataTransferPreValidationTask dt1 = new DataTransferPreValidationTask();
dt1.setTenantId(request.getMessage().getTenantId());
dt1.setCustosHost(custosHost);
dt1.setCustosPort(custosPort);
@@ -239,6 +237,6 @@ public class DataSyncWorkflowManager {
//String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
String[] startTaskIds = {dt1.getTaskId()};
String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
- logger.info("Launched workflow {}", workflowId);
+ logger.info("Launched workflow {}", workflowId);*/
}
}
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index 29be8a4..8128ff3 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
@GRpcService
public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServiceImplBase {
@@ -43,13 +45,22 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
@Autowired
private DataParsingWorkflowManager dataParsingWorkflowManager;
+ private final ScheduledExecutorService workflowExecutorService = Executors.newSingleThreadScheduledExecutor();
+
@Override
public void invokeWorkflow(WorkflowInvocationRequest request,
StreamObserver<WorkflowInvocationResponse> responseObserver) {
try {
- logger.info("Invoking workflow executor for resource {}", request.getMessage().getSourceResourceId());
- //dataSyncWorkflowManager.submitDataSyncWorkflow(request);
- dataParsingWorkflowManager.submitDataParsingWorkflow(request);
+
+ logger.info("Invoking workflow executor");
+ workflowExecutorService.submit(() -> {
+ try {
+ dataParsingWorkflowManager.submitDataParsingWorkflow(request);
+ } catch (Exception e) {
+ logger.error("Failed to submit data parsing workflow for destination resource {}",
+ request.getMessage().getDestinationResourceId());
+ }
+ });
responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
responseObserver.onCompleted();
} catch (Exception ex) {
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index c1b5da3..532aff2 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -26,8 +26,7 @@ import "google/protobuf/empty.proto";
message WorkflowMessage {
string message_id = 1;
- string resource_id = 2;
- string source_resource_id = 3;
+ repeated string source_resource_ids = 3;
string destination_resource_id = 4;
string username = 5;
string tenantId = 6;