You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/08/23 11:37:20 UTC

[airavata-data-lake] branch master updated: Rewriting data orchestrator logic to support directory scanning through MFT

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new ef6a5b0  Rewriting data orchestrator logic to support directory scanning through MFT
ef6a5b0 is described below

commit ef6a5b0f0d748a459b59c331f33c032632fa0fbb
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Aug 23 07:37:09 2021 -0400

    Rewriting data orchestrator logic to support directory scanning through MFT
---
 ansible/inventories/test/group_vars/all/vars.yml   |   2 +-
 .../templates/data-orchestrator/config.yml.j2      |   2 +
 .../clients/core/AbstractListener.java             |  15 +-
 .../clients/core/EventPublisher.java               |   3 +-
 .../file/client/model/Configuration.java           |   9 +
 .../file/client/publisher/FileEventPublisher.java  |   5 +-
 .../file/client/watcher/FileWatcher.java           |  77 +++---
 .../messaging/MessagingEvents.java                 |   9 -
 .../messaging/model/NotificationEvent.java         | 128 ++++------
 .../model/NotificationEventDeserializer.java       |  21 +-
 .../model/NotificationEventSerializer.java         |  16 +-
 .../messaging/publisher/MessageProducer.java       |   2 +-
 .../persistance/entity/DataOrchestratorEntity.java | 226 ------------------
 .../persistance/entity/OwnershipEntity.java        |  56 -----
 .../persistance/entity/WorkflowEntity.java         | 108 ---------
 .../persistance/entity/WorkflowTaskEntity.java     | 118 ----------
 .../entity/parser/DataParsingJobOutputEntity.java  |  11 +
 .../DataOrchestratorEventRepository.java           |  32 ---
 .../repository/WorkflowEntityRepository.java       |   7 -
 .../datalake/orchestrator/Configuration.java       |  18 ++
 .../orchestrator/connectors/DRMSConnector.java     |  73 ++----
 .../connectors/WorkflowServiceConnector.java       |   6 +-
 .../handlers/async/OrchestratorEventHandler.java   |  24 +-
 .../handlers/async/OrchestratorEventProcessor.java | 260 +++++++++++++++++++++
 .../processor/InboundEventProcessor.java           | 145 ------------
 .../processor/OutboundEventProcessor.java          | 197 ----------------
 .../src/main/proto/parsing.proto                   |   1 +
 .../wm/datasync/DataParsingWorkflowManager.java    | 211 ++++++++---------
 .../wm/datasync/DataSyncWorkflowManager.java       |   6 +-
 .../wm/datasync/WorkflowEngineAPIHandler.java      |  17 +-
 .../src/main/proto/service/WorkflowService.proto   |   3 +-
 31 files changed, 577 insertions(+), 1231 deletions(-)

diff --git a/ansible/inventories/test/group_vars/all/vars.yml b/ansible/inventories/test/group_vars/all/vars.yml
index 8463bab..134537b 100644
--- a/ansible/inventories/test/group_vars/all/vars.yml
+++ b/ansible/inventories/test/group_vars/all/vars.yml
@@ -42,7 +42,7 @@ custos_repo: "https://github.com/apache/airavata-custos.git"
 custos_git_branch: develop
 
 mft_default_agent_id: agent0
-mft_default_agent_host: 10.1.0.33
+mft_default_agent_host: 10.1.0.42
 mft_default_agent_advertised_url: https://beta.iubemcenter.scigap.org:8443/downloads
 mft_default_agent_port: 3333
 
diff --git a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2 b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
index 40f6fdb..914245a 100644
--- a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
+++ b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
@@ -10,6 +10,8 @@ outboundEventProcessor:
   workflowPort: {{ workflow_manager_grpc_port }}
   drmsHost: "{{ datalake_drms_host }}"
   drmsPort: {{ datalake_drms_grpc_port }}
+  mftHost: "{{ mft_api_service_host }}"
+  mftPort: {{ mft_api_service_grpc_port }}
 consumer:
   brokerURL: "{{ datalake_data_orch_broker_url }}"
   consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
index ba9932b..5395376 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
@@ -1,6 +1,5 @@
 package org.apache.airavata.dataorchestrator.clients.core;
 
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
 import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,28 +21,28 @@ public abstract class AbstractListener implements EventListener {
 
     public void onRegistered(NotificationEvent event) throws Exception {
         LOGGER.info(" Registration event received for path " + event.getResourcePath());
-        eventPublisher.publish(event, MessagingEvents.REGISTER);
+        eventPublisher.publish(event, NotificationEvent.Type.REGISTER);
 
     }
 
     public void onCreated(NotificationEvent event) throws Exception {
         LOGGER.info(event.getResourceType() + " " +
-                event.getResourcePath() + ":" + event.getResourceName() + " Created");
-        eventPublisher.publish(event, MessagingEvents.CREATE);
+                event.getResourcePath() + ":" + event.getResourcePath() + " Created");
+        eventPublisher.publish(event, NotificationEvent.Type.CREATE);
 
     }
 
     public void onModified(NotificationEvent event) throws Exception {
         LOGGER.info(event.getResourceType() + " " +
-                event.getResourcePath() + ":" + event.getResourceName() + " Created");
-        eventPublisher.publish(event, MessagingEvents.MODIFY);
+                event.getResourcePath() + ":" + event.getResourcePath() + " Created");
+        eventPublisher.publish(event, NotificationEvent.Type.MODIFY);
 
     }
 
     public void onDeleted(NotificationEvent event) throws Exception {
         LOGGER.info(event.getResourceType() + " " +
-                event.getResourcePath() + ":" + event.getResourceName() + " Created");
-        eventPublisher.publish(event, MessagingEvents.DELETE);
+                event.getResourcePath() + ":" + event.getBasePath() + " Created");
+        eventPublisher.publish(event, NotificationEvent.Type.DELETE);
 
     }
 
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
index 53b40c9..8b52c79 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
@@ -1,6 +1,5 @@
 package org.apache.airavata.dataorchestrator.clients.core;
 
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
 import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
 
 import java.util.concurrent.ExecutionException;
@@ -11,7 +10,7 @@ import java.util.concurrent.ExecutionException;
 public interface EventPublisher {
 
 
-    public void publish(NotificationEvent notificationEvent, MessagingEvents event) throws Exception;
+    public void publish(NotificationEvent notificationEvent, NotificationEvent.Type eventType) throws Exception;
 
 
 }
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
index 630db9c..d4a42df 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
@@ -3,6 +3,7 @@ package org.apache.airavata.dataorchestrator.file.client.model;
 public class Configuration {
     private String listeningPath;
     private String hostName;
+    private int depth = 2;
 
     private Producer producer;
 
@@ -43,6 +44,14 @@ public class Configuration {
         this.custos = custos;
     }
 
+    public int getDepth() {
+        return depth;
+    }
+
+    public void setDepth(int depth) {
+        this.depth = depth;
+    }
+
     public static class Producer {
         private String brokerURL;
         private String publisherId;
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
index 82b8199..a228d1c 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
@@ -2,7 +2,6 @@ package org.apache.airavata.dataorchestrator.file.client.publisher;
 
 import org.apache.airavata.dataorchestrator.clients.core.EventPublisher;
 import org.apache.airavata.dataorchestrator.file.client.model.Configuration;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
 import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
 import org.apache.airavata.dataorchestrator.messaging.publisher.MessageProducer;
 import org.apache.kafka.clients.producer.Callback;
@@ -25,8 +24,8 @@ public class FileEventPublisher implements EventPublisher {
     }
 
     @Override
-    public void publish(NotificationEvent notificationEvent, MessagingEvents event) throws ExecutionException, InterruptedException {
-        notificationEvent.getContext().setEvent(event);
+    public void publish(NotificationEvent notificationEvent, NotificationEvent.Type eventType) throws ExecutionException, InterruptedException {
+        notificationEvent.setEventType(eventType);
         messageProducer.publish(configuration.getProducer().getPublisherTopic(), notificationEvent, new Callback() {
             @Override
             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
index 9e16018..c1bf220 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
@@ -29,15 +29,12 @@ public class FileWatcher implements Runnable {
 
     private Configuration configuration;
 
-
     public FileWatcher(File rootFolder, Configuration configuration) throws IOException {
         this.rootFolder = rootFolder;
         this.configuration = configuration;
     }
 
-
     @Override
-
     public void run() {
 
         LOGGER.info("Watcher service starting at " + rootFolder.getAbsolutePath());
@@ -51,12 +48,9 @@ public class FileWatcher implements Runnable {
         } catch (Exception e) {
             LOGGER.error("Error occurred while watching  folder " + rootFolder.getAbsolutePath(), e);
             Thread.currentThread().interrupt();
-
         }
-
     }
 
-
     protected void pollEvents(WatchService watchService) throws Exception {
 
         WatchKey key = watchService.take();
@@ -68,10 +62,10 @@ public class FileWatcher implements Runnable {
         if (!key.reset()) {
             keyPathMap.remove(key);
         }
+
         if (keyPathMap.isEmpty()) {
             return;
         }
-
     }
 
 
@@ -82,12 +76,14 @@ public class FileWatcher implements Runnable {
 
         path = parentPath.resolve(path);
         File file = path.toFile();
-        FileEvent event = getFileEvent(file);
+        Optional<FileEvent> event = getFileEvent(file);
 
         if (kind == ENTRY_CREATE) {
 
-            for (AbstractListener listener : listeners) {
-                listener.onCreated(event);
+            if (event.isPresent()) {
+                for (AbstractListener listener : listeners) {
+                    listener.onCreated(event.get());
+                }
             }
 
             if (file.isDirectory()) {
@@ -96,23 +92,22 @@ public class FileWatcher implements Runnable {
 
         } else if (kind == ENTRY_MODIFY) {
 
-            for (AbstractListener listener : listeners) {
-
-                listener.onModified(event);
-
+            if (event.isPresent()) {
+                for (AbstractListener listener : listeners) {
+                    listener.onModified(event.get());
+                }
             }
 
         } else if (kind == ENTRY_DELETE) {
 
-            for (AbstractListener listener : listeners) {
-                listener.onDeleted(event);
+            if (event.isPresent()) {
+                for (AbstractListener listener : listeners) {
+                    listener.onDeleted(event.get());
+                }
             }
-
         }
-
     }
 
-
     public FileWatcher addListener(AbstractListener listener) {
 
         listeners.add(listener);
@@ -152,24 +147,44 @@ public class FileWatcher implements Runnable {
      */
 
 
-    protected FileEvent getFileEvent(File file) {
+    protected Optional<FileEvent> getFileEvent(File file) {
         FileEvent event = new FileEvent();
-        if (file.isDirectory()) {
+
+
+        String absolutePath = file.getAbsolutePath();
+        if (configuration.getDepth() > 0) {
+            String relativePath = absolutePath.substring(configuration.getListeningPath().length());
+            if (relativePath.startsWith("/")) {
+                relativePath = relativePath.substring(1);
+            }
+            String[] relativeParts = relativePath.split("/");
+            if (relativeParts.length >= configuration.getDepth()) {
+                String beginPath = configuration.getListeningPath();
+                beginPath = beginPath.endsWith("/") ? beginPath.substring(0, beginPath.length()-1) : beginPath;
+                for (int step = 0; step < configuration.getDepth(); step++) {
+                    beginPath = beginPath + "/" + relativeParts[step];
+                }
+                absolutePath = beginPath;
+            } else {
+                LOGGER.warn("Depth of path {} is not greater or equal to required depth {}", absolutePath, configuration.getDepth());
+                return Optional.empty();
+            }
+        }
+
+        if (new File(absolutePath).isDirectory()) {
             event.setResourceType(Constants.FOLDER);
         } else {
             event.setResourceType(Constants.FILE);
         }
-        event.setResourceName(file.getName());
-        event.setResourcePath(file.getAbsolutePath());
-        NotificationEvent.Context context = new NotificationEvent.Context();
-        context.setOccuredTime(System.currentTimeMillis());
-        context.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
+
+        event.setResourcePath(absolutePath);
+        event.setOccuredTime(System.currentTimeMillis());
+        event.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
                 + ":" + configuration.getCustos().getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)));
-        context.setBasePath(configuration.getListeningPath());
-        context.setTenantId(configuration.getCustos().getTenantId());
-        context.setHostName(configuration.getHostName());
-        event.setContext(context);
-        return event;
+        event.setBasePath(configuration.getListeningPath());
+        event.setTenantId(configuration.getCustos().getTenantId());
+        event.setHostName(configuration.getHostName());
+        return Optional.of(event);
     }
 
     private static void registerDir(Path path, WatchService watchService) throws
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
deleted file mode 100644
index 3771b72..0000000
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.airavata.dataorchestrator.messaging;
-
-public enum MessagingEvents {
-    
-    REGISTER,
-    CREATE,
-    MODIFY,
-    DELETE
-}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
index 1cda9db..baecaeb 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
@@ -1,26 +1,25 @@
 package org.apache.airavata.dataorchestrator.messaging.model;
 
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
-
-import java.io.Serializable;
-import java.util.UUID;
-
 /**
  * Notification event represents triggering messages
  */
 public class  NotificationEvent {
-    private String resourcePath;
-    private String resourceName;
-    private String resourceType;
-    private Context context;
-    private String id;
-
-
-    public NotificationEvent() {
-        this.id = UUID.randomUUID().toString();
 
+    public enum Type {
+        REGISTER,
+        CREATE,
+        MODIFY,
+        DELETE
     }
 
+    private String resourcePath;
+    private String resourceType;
+    private Long occuredTime;
+    private String authToken;
+    private String tenantId;
+    private String hostName;
+    private String basePath;
+    private Type eventType;
 
     public String getResourcePath() {
         return resourcePath;
@@ -30,14 +29,6 @@ public class  NotificationEvent {
         this.resourcePath = resourcePath;
     }
 
-    public String getResourceName() {
-        return resourceName;
-    }
-
-    public void setResourceName(String resourceName) {
-        this.resourceName = resourceName;
-    }
-
     public String getResourceType() {
         return resourceType;
     }
@@ -46,84 +37,51 @@ public class  NotificationEvent {
         this.resourceType = resourceType;
     }
 
-    public Context getContext() {
-        return context;
+    public Long getOccuredTime() {
+        return occuredTime;
     }
 
-    public void setContext(Context context) {
-        this.context = context;
+    public void setOccuredTime(Long occuredTime) {
+        this.occuredTime = occuredTime;
     }
 
-    public String getId() {
-        return id;
+    public String getAuthToken() {
+        return authToken;
     }
 
-    public void setId(String id) {
-        this.id = id;
+    public void setAuthToken(String authToken) {
+        this.authToken = authToken;
     }
 
-    public static class Context implements Serializable {
-
-        private MessagingEvents event;
-        private Long occuredTime;
-        private String authToken;
-        private String tenantId;
-        private String hostName;
-        private String basePath;
-
-
-        public MessagingEvents getEvent() {
-            return event;
-        }
-
-        public void setEvent(MessagingEvents event) {
-            this.event = event;
-        }
-
-        public Long getOccuredTime() {
-            return occuredTime;
-        }
-
-        public void setOccuredTime(Long occuredTime) {
-            this.occuredTime = occuredTime;
-        }
-
-        public String getAuthToken() {
-            return authToken;
-        }
-
-        public void setAuthToken(String authToken) {
-            this.authToken = authToken;
-        }
-
-        public String getTenantId() {
-            return tenantId;
-        }
-
-        public void setTenantId(String tenantId) {
-            this.tenantId = tenantId;
-        }
+    public String getTenantId() {
+        return tenantId;
+    }
 
-        public String getBasePath() {
-            return basePath;
-        }
+    public void setTenantId(String tenantId) {
+        this.tenantId = tenantId;
+    }
 
-        public void setBasePath(String basePath) {
-            this.basePath = basePath;
-        }
+    public String getHostName() {
+        return hostName;
+    }
 
-        public String getHostName() {
-            return hostName;
-        }
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
 
-        public void setHostName(String hostName) {
-            this.hostName = hostName;
-        }
+    public String getBasePath() {
+        return basePath;
     }
 
-    public String getResourceId() {
-        return context.hostName+ ":" + resourcePath + ":" + resourceType;
+    public void setBasePath(String basePath) {
+        this.basePath = basePath;
     }
 
+    public Type getEventType() {
+        return eventType;
+    }
 
+    public void setEventType(Type eventType) {
+        this.eventType = eventType;
+    }
 }
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index f19d272..d8d834a 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -1,6 +1,5 @@
 package org.apache.airavata.dataorchestrator.messaging.model;
 
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.Map;
@@ -19,18 +18,14 @@ public class NotificationEventDeserializer implements Deserializer<NotificationE
         String deserialized = new String(bytes);
         String parts[] = deserialized.split(",");
         NotificationEvent event = new NotificationEvent();
-        NotificationEvent.Context context = new NotificationEvent.Context();
-        event.setId(parts[0]);
-        context.setEvent(MessagingEvents.valueOf(parts[1]));
-        context.setOccuredTime(Long.valueOf(parts[2]));
-        context.setAuthToken(String.valueOf(parts[3]));
-        context.setTenantId(String.valueOf(parts[4]));
-        context.setHostName(parts[5]);
-        context.setBasePath(parts[6]);
-        event.setResourcePath(parts[7]);
-        event.setResourceType(parts[8]);
-        event.setResourceName(parts[9]);
-        event.setContext(context);
+        event.setResourcePath(parts[0]);
+        event.setResourceType(parts[1]);
+        event.setOccuredTime(Long.valueOf(parts[2]));
+        event.setTenantId(parts[3]);
+        event.setHostName(parts[4]);
+        event.setBasePath(parts[5]);
+        event.setEventType(NotificationEvent.Type.valueOf(parts[6]));
+        event.setAuthToken(parts[7]);
         return event;
     }
 
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 7a55770..c92d181 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -15,16 +15,14 @@ public class NotificationEventSerializer implements Serializer<NotificationEvent
 
     @Override
     public byte[] serialize(String s, NotificationEvent notificationEvent) {
-        String serializedData = notificationEvent.getId() + "," +
-                notificationEvent.getContext().getEvent().name() + "," +
-                notificationEvent.getContext().getOccuredTime() + "," +
-                notificationEvent.getContext().getAuthToken() + "," +
-                notificationEvent.getContext().getTenantId() + "," +
-                notificationEvent.getContext().getHostName() + "," +
-                notificationEvent.getContext().getBasePath() + "," +
-                notificationEvent.getResourcePath() + "," +
+        String serializedData = notificationEvent.getResourcePath() + "," +
                 notificationEvent.getResourceType() + "," +
-                notificationEvent.getResourceName();
+                notificationEvent.getOccuredTime() + "," +
+                notificationEvent.getTenantId() + "," +
+                notificationEvent.getHostName() + "," +
+                notificationEvent.getBasePath() + "," +
+                notificationEvent.getEventType() + "," +
+                notificationEvent.getAuthToken();
         return serializedData.getBytes();
     }
 
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
index e4aa253..4abded7 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
@@ -30,7 +30,7 @@ public class MessageProducer {
     public void publish(String topic, NotificationEvent notificationMessage, Callback callback) throws ExecutionException, InterruptedException {
         try {
             final ProducerRecord<String, NotificationEvent> record = new ProducerRecord<>(topic,
-                    notificationMessage.getId(),
+                    notificationMessage.getResourcePath(),
                     notificationMessage);
             producer.send(record, callback).get();
         } finally {
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
deleted file mode 100644
index d1843fd..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-import java.util.Set;
-
-/**
- * DataOrchestrator entity
- */
-@Entity
-@Table(name = "DATAORCHESTRATOR_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class DataOrchestratorEntity {
-
-    @Id
-    private String id;
-
-    @Column(nullable = false)
-    private String resourceId;
-
-    @Column(nullable = false)
-    private String resourcePath;
-
-    @Column(nullable = false)
-    private String resourceName;
-
-    @Column(nullable = false)
-    private String resourceType;
-
-    @Column(nullable = false)
-    private Date occurredTime;
-
-    @Column(nullable = false)
-    @Temporal(TemporalType.TIMESTAMP)
-    @CreatedDate
-    private Date createdAt;
-
-    @Column(nullable = false)
-    @Temporal(TemporalType.TIMESTAMP)
-    @LastModifiedDate
-    private Date lastModifiedAt;
-
-    @Column(nullable = false)
-    private String eventStatus;
-
-    @Column(nullable = false)
-    private String eventType;
-
-    @Column(nullable = false)
-    private String tenantId;
-
-    @Column(nullable = false)
-    private String agentId;
-
-    @Column(nullable = false)
-    private String authToken;
-
-    @Column(nullable = false)
-    private String hostName;
-
-    @Lob
-    private String error;
-
-    @OneToMany(fetch = FetchType.EAGER, mappedBy = "dataOrchestratorEntity", orphanRemoval = true, cascade = CascadeType.ALL)
-    private Set<WorkflowEntity> workFlowEntities;
-
-    @OneToMany(fetch = FetchType.EAGER, mappedBy = "dataOrchestratorEntity", orphanRemoval = true, cascade = CascadeType.ALL)
-    private Set<OwnershipEntity> ownershipEntities;
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getResourceId() {
-        return resourceId;
-    }
-
-    public void setResourceId(String resourceId) {
-        this.resourceId = resourceId;
-    }
-
-    public String getResourcePath() {
-        return resourcePath;
-    }
-
-    public void setResourcePath(String resourcePath) {
-        this.resourcePath = resourcePath;
-    }
-
-    public String getResourceName() {
-        return resourceName;
-    }
-
-    public void setResourceName(String resourceName) {
-        this.resourceName = resourceName;
-    }
-
-    public String getResourceType() {
-        return resourceType;
-    }
-
-    public void setResourceType(String resourceType) {
-        this.resourceType = resourceType;
-    }
-
-    public Date getOccurredTime() {
-        return occurredTime;
-    }
-
-    public void setOccurredTime(Date occurredTime) {
-        this.occurredTime = occurredTime;
-    }
-
-    public Date getCreatedAt() {
-        return createdAt;
-    }
-
-    public void setCreatedAt(Date createdAt) {
-        this.createdAt = createdAt;
-    }
-
-    public Date getLastModifiedAt() {
-        return lastModifiedAt;
-    }
-
-    public void setLastModifiedAt(Date lastModifiedAt) {
-        this.lastModifiedAt = lastModifiedAt;
-    }
-
-    public String getEventStatus() {
-        return eventStatus;
-    }
-
-    public void setEventStatus(String status) {
-        this.eventStatus = status;
-    }
-
-    public Set<WorkflowEntity> getWorkFlowEntities() {
-        return workFlowEntities;
-    }
-
-    public void setWorkFlowEntities(Set<WorkflowEntity> workFlowEntities) {
-        this.workFlowEntities = workFlowEntities;
-    }
-
-    public String getError() {
-        return error;
-    }
-
-    public void setError(String error) {
-        this.error = error;
-    }
-
-    public String getEventType() {
-        return eventType;
-    }
-
-    public void setEventType(String eventType) {
-        this.eventType = eventType;
-    }
-
-    public String getTenantId() {
-        return tenantId;
-    }
-
-    public void setTenantId(String tenantId) {
-        this.tenantId = tenantId;
-    }
-
-    public String getAgentId() {
-        return agentId;
-    }
-
-    public void setAgentId(String agentId) {
-        this.agentId = agentId;
-    }
-
-    public String getAuthToken() {
-        return authToken;
-    }
-
-    public void setAuthToken(String authToken) {
-        this.authToken = authToken;
-    }
-
-    public String getHostName() {
-        return hostName;
-    }
-
-    public void setHostName(String hostName) {
-        this.hostName = hostName;
-    }
-
-    public Set<OwnershipEntity> getOwnershipEntities() {
-        return ownershipEntities;
-    }
-
-    public void setOwnershipEntities(Set<OwnershipEntity> ownershipEntities) {
-        this.ownershipEntities = ownershipEntities;
-    }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
deleted file mode 100644
index ec6e4eb..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-
-@Entity
-@Table(name = "OWNERSHIP_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class OwnershipEntity {
-
-    @Id
-    private String id;
-
-    @Column(name = "USER_ID")
-    private String userId;
-
-    @Column(name = "PERMISSION_ID")
-    private String permissionId;
-
-    @ManyToOne
-    @JoinColumn(name = "DATA_ORCHESTRATOR_ENTITY_ID")
-    private DataOrchestratorEntity dataOrchestratorEntity;
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getUserId() {
-        return userId;
-    }
-
-    public void setUserId(String userId) {
-        this.userId = userId;
-    }
-
-    public String getPermissionId() {
-        return permissionId;
-    }
-
-    public void setPermissionId(String permissionId) {
-        this.permissionId = permissionId;
-    }
-
-    public DataOrchestratorEntity getDataOrchestratorEntity() {
-        return dataOrchestratorEntity;
-    }
-
-    public void setDataOrchestratorEntity(DataOrchestratorEntity dataOrchestratorEntity) {
-        this.dataOrchestratorEntity = dataOrchestratorEntity;
-    }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
deleted file mode 100644
index d585020..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-import java.util.Set;
-
-/**
- * An workflow class that represents the workflow entity
- */
-@Entity
-@Table(name = "WORKFLOW_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class WorkflowEntity {
-
-    @Id
-    private String id;
-
-    @Column(nullable = false)
-    @Temporal(TemporalType.TIMESTAMP)
-    @CreatedDate
-    private Date createdAt;
-
-
-    @Column(nullable = false)
-    @Temporal(TemporalType.TIMESTAMP)
-    @LastModifiedDate
-    private Date lastModifiedAt;
-
-    @Column(nullable = false)
-    private String status;
-
-
-    @ManyToOne
-    @JoinColumn(name = "dataorchestrator_entity_id")
-    private DataOrchestratorEntity dataOrchestratorEntity;
-
-    @OneToMany(fetch = FetchType.EAGER, mappedBy = "workflowEntity", orphanRemoval = true, cascade = CascadeType.ALL)
-    private Set<WorkflowTaskEntity> workflowTaskEntities;
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public Date getCreatedAt() {
-        return createdAt;
-    }
-
-    public void setCreatedAt(Date createdAt) {
-        this.createdAt = createdAt;
-    }
-
-    public Date getLastModifiedAt() {
-        return lastModifiedAt;
-    }
-
-    public void setLastModifiedAt(Date lastModifiedAt) {
-        this.lastModifiedAt = lastModifiedAt;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public DataOrchestratorEntity getDataOrchestratorEntity() {
-        return dataOrchestratorEntity;
-    }
-
-    public void setDataOrchestratorEntity(DataOrchestratorEntity dataOrchestratorEntity) {
-        this.dataOrchestratorEntity = dataOrchestratorEntity;
-    }
-
-    public Set<WorkflowTaskEntity> getWorkflowTaskEntities() {
-        return workflowTaskEntities;
-    }
-
-    public void setWorkflowTaskEntities(Set<WorkflowTaskEntity> workflowTaskEntities) {
-        this.workflowTaskEntities = workflowTaskEntities;
-    }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
deleted file mode 100644
index b20ddd8..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.WorkflowEntity;
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-
-/**
- * An entity class represents the task entity
- */
-@Entity
-@Table(name = "WORKFLOW_TASK_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class WorkflowTaskEntity {
-
-    @Id
-    private String id;
-
-    @ManyToOne
-    @JoinColumn(name = "workflow_entity_id")
-    private WorkflowEntity workflowEntity;
-
-
-    @Column(nullable = false)
-    @Temporal(TemporalType.TIMESTAMP)
-    @CreatedDate
-    private Date createdAt;
-
-
-    @Column(nullable = false)
-    @Temporal(TemporalType.TIMESTAMP)
-    @LastModifiedDate
-    private Date lastModifiedAt;
-
-    @Column(nullable = false)
-    private String status;
-
-    @Lob
-    private String error;
-
-    private int errorCode;
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public WorkflowEntity getWorkflowEntity() {
-        return workflowEntity;
-    }
-
-    public void setWorkflowEntity(WorkflowEntity workFlowEntity) {
-        this.workflowEntity = workFlowEntity;
-    }
-
-    public Date getCreatedAt() {
-        return createdAt;
-    }
-
-    public void setCreatedAt(Date createdAt) {
-        this.createdAt = createdAt;
-    }
-
-    public Date getLastModifiedAt() {
-        return lastModifiedAt;
-    }
-
-    public void setLastModifiedAt(Date lastModifiedAt) {
-        this.lastModifiedAt = lastModifiedAt;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public String getError() {
-        return error;
-    }
-
-    public void setError(String error) {
-        this.error = error;
-    }
-
-    public int getErrorCode() {
-        return errorCode;
-    }
-
-    public void setErrorCode(int errorCode) {
-        this.errorCode = errorCode;
-    }
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
index 27aa4fd..b37c123 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
@@ -36,6 +36,9 @@ public class DataParsingJobOutputEntity {
     @Column(name = "OUTPUT_TYPE")
     private DataParsingJobOutputType outputType;
 
+    @Column(name = "OUTPUT_DIR_RESOURCE_ID")
+    private String outputDirectoryResourceId;
+
     @ManyToOne(fetch = FetchType.LAZY)
     @JoinColumn(name = "DATA_PARSER_OUTPUT_INTERFACE_ID", insertable=false, updatable=false)
     private DataParserOutputInterfaceEntity dataParserOutputInterface;
@@ -91,4 +94,12 @@ public class DataParsingJobOutputEntity {
     public void setDataParsingJobEntity(DataParsingJobEntity dataParsingJobEntity) {
         this.dataParsingJobEntity = dataParsingJobEntity;
     }
+
+    public String getOutputDirectoryResourceId() {
+        return outputDirectoryResourceId;
+    }
+
+    public void setOutputDirectoryResourceId(String outputDirectoryResourceId) {
+        this.outputDirectoryResourceId = outputDirectoryResourceId;
+    }
 }
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
deleted file mode 100644
index 43f2eb9..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.data.jpa.repository.Query;
-
-import java.util.List;
-
-public interface DataOrchestratorEventRepository extends JpaRepository<DataOrchestratorEntity, String> {
-
-    @Query(value = "select * from DATAORCHESTRATOR_ENTITY s where s.eventStatus = ?1  ORDER BY occurredTime DESC", nativeQuery = true)
-    public List<DataOrchestratorEntity> findAllEntitiesWithGivenStatus(String eventStatus);
-
-
-}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
deleted file mode 100644
index 4014186..0000000
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.WorkflowEntity;
-import org.springframework.data.jpa.repository.JpaRepository;
-
-public interface WorkflowEntityRepository extends JpaRepository<WorkflowEntity, String> {
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
index d728ef1..52494e5 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
@@ -153,6 +153,8 @@ public class Configuration {
         private String drmsHost;
         private int drmsPort;
         private int pollingInterval;
+        private String mftHost;
+        private int mftPort;
 
 
         public OutboundEventProcessorConfig() {
@@ -198,6 +200,22 @@ public class Configuration {
         public void setPollingInterval(int pollingInterval) {
             this.pollingInterval = pollingInterval;
         }
+
+        public String getMftHost() {
+            return mftHost;
+        }
+
+        public void setMftHost(String mftHost) {
+            this.mftHost = mftHost;
+        }
+
+        public int getMftPort() {
+            return mftPort;
+        }
+
+        public void setMftPort(int mftPort) {
+            this.mftPort = mftPort;
+        }
     }
 
 
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index e24eb25..96fac88 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -11,10 +11,6 @@ import org.apache.airavata.datalake.drms.sharing.ShareEntityWithUserRequest;
 import org.apache.airavata.datalake.drms.storage.*;
 import org.apache.airavata.datalake.orchestrator.Configuration;
 import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,50 +56,36 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
         return !this.drmsChannel.isShutdown();
     }
 
-    public void shareWithUser(DataOrchestratorEntity entity) throws Exception {
-
-        Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
-        if (adminOp.isEmpty()) {
-            throw new Exception("No admin user found");
-        }
+    public void shareWithUser(String authToken, String tenantId, String admin, String user, String resourceId, String permission) throws Exception {
 
         DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-                .setAccessToken(entity.getAuthToken())
+                .setAccessToken(authToken)
                 .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
                 .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                        .setUsername(adminOp.get().getUserId())
-                        .setTenantId(entity.getTenantId())
+                        .setUsername(admin)
+                        .setTenantId(tenantId)
                         .build())
                 .build();
 
-        for (OwnershipEntity ownershipEntity : entity.getOwnershipEntities()) {
-            if (ownershipEntity.getPermissionId().equals("ADMIN")) {
-                continue;
-            }
-            ShareEntityWithUserRequest.Builder shareBuilder = ShareEntityWithUserRequest.newBuilder()
-                    .setAuthToken(serviceAuthToken)
-                    .setEntityId(entity.getResourceId())
-                    .setSharedUserId(ownershipEntity.getUserId())
-                    .setPermissionId(ownershipEntity.getPermissionId());
-
-            this.sharingServiceBlockingStub.shareEntityWithUser(shareBuilder.build());
-        }
+        ShareEntityWithUserRequest.Builder shareBuilder = ShareEntityWithUserRequest.newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .setEntityId(resourceId)
+                .setSharedUserId(user)
+                .setPermissionId(permission);
 
-    }
+        this.sharingServiceBlockingStub.shareEntityWithUser(shareBuilder.build());
 
-    public Optional<TransferMapping> getActiveTransferMapping(DataOrchestratorEntity entity, String hostname) throws Exception {
+    }
 
-        Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
-        if (adminOp.isEmpty()) {
-            throw new Exception("No admin user found");
-        }
+    public Optional<TransferMapping> getActiveTransferMapping(String authToken, String tenantId,
+                                                              String user, String hostName) throws Exception {
 
         DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-                .setAccessToken(entity.getAuthToken())
+                .setAccessToken(authToken)
                 .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
                 .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                        .setUsername(adminOp.get().getUserId())
-                        .setTenantId(entity.getTenantId())
+                        .setUsername(user)
+                        .setTenantId(tenantId)
                         .build())
                 .build();
         FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
@@ -116,7 +98,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
             transferMappingList.forEach(transferMapping -> {
                 if (transferMapping.getSourceStorage().getStorageCase()
                         .equals(AnyStorage.StorageCase.SSH_STORAGE)) {
-                    if (transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname)) {
+                    if (transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostName)) {
                         transferMappingOp.set(transferMapping);
                     }
                 }
@@ -126,24 +108,22 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
     }
 
 
-    public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
+    public Optional<GenericResource> createResource(String authToken,
+                                                    String tenantId,
                                                     String resourceId,
                                                     String resourceName,
                                                     String resourcePath,
                                                     String parentId,
-                                                    String type, String parentType) throws Exception {
+                                                    String type, String parentType,
+                                                    String user) throws Exception {
 
-        Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
-        if (adminOp.isEmpty()) {
-            throw new Exception("No admin user found");
-        }
 
         DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-                .setAccessToken(entity.getAuthToken())
+                .setAccessToken(authToken)
                 .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
                 .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                        .setUsername(adminOp.get().getUserId())
-                        .setTenantId(entity.getTenantId())
+                        .setUsername(user)
+                        .setTenantId(tenantId)
                         .build())
                 .build();
 
@@ -165,10 +145,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
             ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
             return Optional.ofNullable(resourceCreateResponse.getResource());
         } catch (Exception ex) {
-            LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
-            entity.setEventStatus(EventStatus.ERRORED.name());
-            entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
-            repository.save(entity);
+            LOGGER.error("Error occurred while creating resource {} in DRMS", resourcePath, ex);
             return Optional.empty();
         }
     }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
index b95ccb3..d571568 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -10,6 +10,8 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowService
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * Drms connector to call DRMS services
  */
@@ -43,11 +45,11 @@ public class WorkflowServiceConnector implements AbstractConnector<Configuration
         return false;
     }
 
-    public void invokeWorkflow(String authToken, String username, String tenantId, String sourceResourceId, String sourceCredentialToken,
+    public void invokeWorkflow(String authToken, String username, String tenantId, List<String> sourceResourceIds, String sourceCredentialToken,
                                String dstResourceId, String destinationCredentialToken) {
         try {
             WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
-                    .setSourceResourceId(sourceResourceId)
+                    .addAllSourceResourceIds(sourceResourceIds)
                     .setDestinationResourceId(dstResourceId)
                     .setUsername(username)
                     .setTenantId(tenantId)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index ff5fa67..d6d25bc 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -18,9 +18,6 @@
 package org.apache.airavata.datalake.orchestrator.handlers.async;
 
 import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.processor.InboundEventProcessor;
-import org.apache.airavata.datalake.orchestrator.processor.OutboundEventProcessor;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
 import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +27,6 @@ import org.springframework.stereotype.Component;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Orchestrator event handler
@@ -45,10 +41,6 @@ public class OrchestratorEventHandler {
     private ScheduledExecutorService ouboundExecutorService;
     private MessageConsumer messageConsumer;
 
-    @Autowired
-    private DataOrchestratorEventRepository dataOrchestratorEventRepository;
-
-
     public OrchestratorEventHandler() {
     }
 
@@ -60,20 +52,18 @@ public class OrchestratorEventHandler {
                 configuration.getConsumer().getConsumerGroup(),
                 configuration.getConsumer().getMaxPollRecordsConfig(),
                 configuration.getConsumer().getTopic());
-
     }
 
     public void startProcessing() throws Exception {
         messageConsumer.consume((notificationEvent -> {
-            LOGGER.info("Message received " + notificationEvent.getResourceName());
-            LOGGER.info("Submitting {} to process in thread pool", notificationEvent.getId());
-            this.executorService.submit(new InboundEventProcessor(configuration, notificationEvent, dataOrchestratorEventRepository));
-
+            LOGGER.info("Message received for resource path {}", notificationEvent.getResourcePath());
+            try {
+                this.executorService.submit(new OrchestratorEventProcessor(configuration, notificationEvent));
+            } catch (Exception e) {
+                LOGGER.error("Failed tu submit data orchestrator event to process on path {}",
+                        notificationEvent.getResourcePath(), e);
+            }
         }));
-
-        this.ouboundExecutorService
-                .scheduleAtFixedRate(new OutboundEventProcessor(configuration, dataOrchestratorEventRepository),
-                        0, configuration.getOutboundEventProcessor().getPollingInterval(), TimeUnit.SECONDS);
     }
 
     public Configuration getConfiguration() {
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
new file mode 100644
index 0000000..6fc7d45
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.handlers.async;
+
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
+import org.apache.airavata.datalake.drms.storage.TransferMapping;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
+import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
+import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
+import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
+import org.apache.airavata.mft.api.service.FileMetadataResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class OrchestratorEventProcessor implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(OrchestratorEventProcessor.class);
+
+    private NotificationEvent notificationEvent;
+
+    private DRMSConnector drmsConnector;
+    private Configuration configuration;
+    private WorkflowServiceConnector workflowServiceConnector;
+
+    public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent) throws Exception {
+        this.notificationEvent = notificationEvent;
+        this.drmsConnector = new DRMSConnector(configuration);
+        this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
+        this.configuration = configuration;
+    }
+
+    private List<GenericResource> createResourceRecursively(String storageId, String basePath, String resourcePath, String resourceType, String user)
+            throws Exception{
+
+        List<GenericResource> resourceList = new ArrayList<>();
+
+        String parentType = "Storage";
+
+        String[] splitted = resourcePath.substring(basePath.length()).split("/");
+
+        String currentPath = basePath.endsWith("/")? basePath.substring(0, basePath.length() -1): basePath;
+        String parentId = storageId;
+        for (int i = 0; i < splitted.length - 1; i++) {
+            String resourceName = splitted[i];
+            currentPath = currentPath + "/" + resourceName;
+            String resourceId = Utils.getId(storageId + ":" + currentPath);
+            Optional<GenericResource> optionalGenericResource =
+                    this.drmsConnector.createResource(notificationEvent.getAuthToken(),
+                            notificationEvent.getTenantId(),
+                            resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
+            if (optionalGenericResource.isPresent()) {
+                parentId = optionalGenericResource.get().getResourceId();
+                parentType = "COLLECTION";
+                resourceList.add(optionalGenericResource.get());
+            } else {
+                logger.error("Could not create a resource for path {}", currentPath);
+                throw new Exception("Could not create a resource for path " + currentPath);
+            }
+        }
+
+        currentPath = currentPath + "/" + splitted[splitted.length -1];
+
+        Optional<GenericResource> optionalGenericResource =
+                this.drmsConnector.createResource(notificationEvent.getAuthToken(),
+                        notificationEvent.getTenantId(),
+                        Utils.getId(storageId + ":" + currentPath),
+                        splitted[splitted.length -1], currentPath,
+                        parentId, resourceType, parentType, user);
+
+        if (optionalGenericResource.isPresent()) {
+            resourceList.add(optionalGenericResource.get());
+        } else {
+            logger.error("Could not create a resource for path {}", currentPath);
+            throw new Exception("Could not create a resource for path " + currentPath);
+        }
+
+        return resourceList;
+    }
+
+
+    private void shareResources(List<GenericResource> resourceList, String admin, String user, String permission) throws Exception {
+        for (GenericResource resource : resourceList) {
+            logger.info("Sharing resource {} with path {} with user {}",
+                    resource.getResourceId(), resource.getResourcePath(), user);
+            this.drmsConnector.shareWithUser(notificationEvent.getAuthToken(), notificationEvent.getTenantId(), admin, user, resource.getResourceId(), permission);
+        }
+    }
+
+    @Override
+    public void run() {
+        logger.info("Processing resource path {} on storage {}", notificationEvent.getResourcePath(),
+                notificationEvent.getBasePath());
+
+        try {
+
+            if (!"FOLDER".equals(notificationEvent.getResourceType())) {
+                logger.error("Resource {} should be a Folder type but got {}",
+                        notificationEvent.getResourcePath(),
+                        notificationEvent.getResourceType());
+                logger.error("Resource should be a Folder type");
+            }
+            String removeBasePath = notificationEvent.getResourcePath().substring(notificationEvent.getBasePath().length());
+            String[] splitted = removeBasePath.split("/");
+
+            String adminUser = splitted[0];
+            String owner = splitted[1].split("_")[0];
+
+            Map<String, String> ownerRules = new HashMap<>();
+            ownerRules.put(adminUser, "ADMIN");
+            ownerRules.put(splitted[1], "OWNER");
+
+            Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
+                    notificationEvent.getAuthToken(),
+                    notificationEvent.getTenantId(), adminUser,
+                    notificationEvent.getHostName());
+
+            if (optionalTransferMapping.isEmpty()) {
+                logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notificationEvent.getHostName());
+                throw new Exception("Could not find a transfer mapping");
+            }
+
+            TransferMapping transferMapping = optionalTransferMapping.get();
+
+            String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
+            String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+
+            // Creating parent resource
+
+            List<GenericResource> resourceList = createResourceRecursively(sourceStorageId,
+                    notificationEvent.getBasePath(),
+                    notificationEvent.getResourcePath(),
+                    "COLLECTION", adminUser);
+
+            shareResources(Collections.singletonList(resourceList.get(resourceList.size() -1)), adminUser, owner, "ADMIN");
+
+            GenericResource resourceObj = resourceList.get(resourceList.size() -1);
+
+            Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
+                    notificationEvent.getAuthToken(), adminUser,
+                    notificationEvent.getTenantId(), sourceStorageId);
+
+            if (sourceSPOp.isEmpty()) {
+                logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
+                throw new Exception("No storage preference found for source storage");
+            }
+
+            Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
+                    notificationEvent.getAuthToken(), adminUser,
+                    notificationEvent.getTenantId(), destinationStorageId);
+
+            if (destSPOp.isEmpty()) {
+                logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
+                throw new Exception("No storage preference found for destination storage");
+            }
+
+            AnyStoragePreference sourceSP = sourceSPOp.get();
+            AnyStoragePreference destSP = destSPOp.get();
+
+            MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(
+                    this.configuration.getOutboundEventProcessor().getMftHost(),
+                    this.configuration.getOutboundEventProcessor().getMftPort());
+
+            String decodedAuth = new String(Base64.getDecoder().decode(notificationEvent.getAuthToken()));
+            String[] authParts = decodedAuth.split(":");
+
+            if (authParts.length != 2) {
+                throw new Exception("Could not decode auth token to work with MFT");
+            }
+
+            DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+                    .setUserId(adminUser)
+                    .setClientId(authParts[0])
+                    .setClientSecret(authParts[1])
+                    .putProperties("TENANT_ID", notificationEvent.getTenantId()).build();
+
+            AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
+
+            FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
+                    .setMftAuthorizationToken(mftAuth)
+                    .setResourceId(resourceObj.getResourceId());
+
+            switch (sourceSP.getStorageCase()){
+                case SSH_STORAGE_PREFERENCE:
+                    resourceMetadataReq.setResourceType("SCP");
+                    resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getCredentialToken());
+                    break;
+                case S3_STORAGE_PREFERENCE:
+                    resourceMetadataReq.setResourceType("S3");
+                    resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getCredentialToken());
+                    break;
+            }
+
+            // Fetching file list for parent resource
+
+            DirectoryMetadataResponse directoryResourceMetadata = mftClient.getDirectoryResourceMetadata(resourceMetadataReq.build());
+
+            List<String> resourceIDsToProcess = new ArrayList<>();
+            for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
+                logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
+                resourceList = createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+                        fileMetadata.getResourcePath(), "FILE", adminUser);
+                GenericResource fileResource = resourceList.get(resourceList.size() -1);
+
+                resourceIDsToProcess.add(fileResource.getResourceId());
+            }
+
+            for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
+                logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
+                createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+                        directoryMetadata.getResourcePath(),
+                        "COLLECTION", adminUser);
+                // TODO scan directories
+            }
+
+            logger.info("Creating destination zip resource for directory {}", notificationEvent.getResourcePath());
+            resourceList = createResourceRecursively(destinationStorageId, notificationEvent.getBasePath(),
+                    notificationEvent.getResourcePath(), "FILE", adminUser);
+
+            GenericResource destinationResource = resourceList.get(resourceList.size() -1);
+
+            System.out.println(destinationResource);
+
+            logger.info("Submitting resources to workflow manager");
+            this.workflowServiceConnector.invokeWorkflow(notificationEvent.getAuthToken(), adminUser,
+                    notificationEvent.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getCredentialToken(),
+                    destinationResource.getResourceId(), destSP.getSshStoragePreference().getCredentialToken());
+
+
+            logger.info("Completed processing path {}", notificationEvent.getResourcePath());
+
+        } catch (Exception e) {
+            logger.error("Failed to process event for resource path {}", notificationEvent.getResourcePath(), e);
+        }
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
deleted file mode 100644
index 62bd6ff..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.processor;
-
-import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.Utils;
-import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.dozer.DozerBeanMapper;
-import org.dozer.loader.api.BeanMappingBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * This class is responsible for pick events from kafka queue and publish them into inmemory store
- */
-public class InboundEventProcessor implements MessageProcessor<Configuration> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(InboundEventProcessor.class);
-    private Configuration configuration;
-    private NotificationEvent notificationEvent;
-    private DozerBeanMapper dozerBeanMapper;
-
-    private DataOrchestratorEventRepository repository;
-
-    public InboundEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
-                                 DataOrchestratorEventRepository repository) throws Exception {
-        this.notificationEvent = notificationEvent;
-        this.repository = repository;
-        this.init(configuration);
-    }
-
-    @Override
-    public void init(Configuration configuration) throws Exception {
-        try {
-            this.configuration = configuration;
-            dozerBeanMapper = new DozerBeanMapper();
-            BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
-                @Override
-                protected void configure() {
-                    mapping(NotificationEvent.class, DataOrchestratorEntity.class);
-                }
-            };
-            dozerBeanMapper.addMapping(orchestratorEventMapper);
-        } catch (Exception exception) {
-            LOGGER.error(" Error occurred while initiating Inbound event processor ", exception);
-            throw exception;
-        }
-
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-
-    @Override
-    public void run() {
-        try {
-            LOGGER.info("Inbound event processor received event " + notificationEvent.getResourceId());
-            String typeStr = this.configuration.getMessageFilter().getResourceType();
-            String[] allowedTypes = typeStr.split(",");
-            boolean proceed = false;
-            long size = Arrays.stream(allowedTypes).filter(type ->
-                    type.equals(notificationEvent.getResourceType())).count();
-            if (size == 0) {
-                return;
-            }
-            String eventTypeStr = this.configuration.getMessageFilter().getEventType();
-            String[] eventTypes = eventTypeStr.split(",");
-            long eventSize = Arrays.stream(eventTypes).filter(type ->
-                    type.trim().equals(notificationEvent.getContext().getEvent().name())).count();
-            if (eventSize == 0) {
-                return;
-            }
-
-            String pattern = this.configuration.getMessageFilter().getResourceNameExclusions();
-
-            // Create a Pattern object
-            Pattern r = Pattern.compile(pattern);
-
-            // Now create matcher object.
-            Matcher m = r.matcher(notificationEvent.getResourceName());
-
-            if (m.find()) {
-                return;
-            }
-
-            DataOrchestratorEntity entity = createEntity(notificationEvent);
-            repository.save(entity);
-        } catch (Exception exception) {
-            LOGGER.error("Error occurred while pre processing event {}", this.notificationEvent.getResourceId(), exception);
-        }
-
-    }
-
-    private DataOrchestratorEntity createEntity(NotificationEvent event) throws NoSuchAlgorithmException {
-        DataOrchestratorEntity entity = dozerBeanMapper.map(event, DataOrchestratorEntity.class);
-        entity.setOccurredTime(new Date(event.getContext().getOccuredTime()));
-        entity.setEventStatus(EventStatus.DATA_ORCH_RECEIVED.name());
-        entity.setEventType(event.getContext().getEvent().name());
-        entity.setAuthToken(event.getContext().getAuthToken());
-        entity.setHostName(event.getContext().getHostName());
-
-        String resourcePath = event.getResourcePath();
-        String basePath = event.getContext().getBasePath();
-        String removeBasePath = resourcePath.substring(basePath.length());
-        String[] splitted = removeBasePath.split("/");
-
-        OwnershipEntity owner1 = new OwnershipEntity();
-        owner1.setId(UUID.randomUUID().toString());
-        owner1.setUserId(splitted[1]);
-        owner1.setPermissionId("OWNER");
-        owner1.setDataOrchestratorEntity(entity);
-
-        OwnershipEntity owner2 = new OwnershipEntity();
-        owner2.setId(UUID.randomUUID().toString());
-        owner2.setUserId(splitted[0]);
-        owner2.setPermissionId("ADMIN");
-        owner2.setDataOrchestratorEntity(entity);
-
-
-        Set<OwnershipEntity> owners = new HashSet<>();
-        owners.add(owner1);
-        owners.add(owner2);
-
-        entity.setOwnershipEntities(owners);
-
-        entity.setTenantId(event.getContext().getTenantId());
-
-        String authDecoded = new String(Base64.getDecoder()
-                .decode(event.getContext().getAuthToken().getBytes(StandardCharsets.UTF_8)));
-        String agentId = authDecoded.split(":")[0];
-        entity.setAgentId(agentId);
-        entity.setResourceId(Utils.getId(event.getResourceId()));
-        return entity;
-    }
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
deleted file mode 100644
index 4b04005..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ /dev/null
@@ -1,197 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.processor;
-
-import org.apache.airavata.datalake.drms.resource.GenericResource;
-import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
-import org.apache.airavata.datalake.drms.storage.TransferMapping;
-import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.Utils;
-import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
-import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
-import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.dozer.DozerBeanMapper;
-import org.dozer.loader.api.BeanMappingBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * This class is responsible  and publish events to registry and
- * Workflow engine
- */
-public class OutboundEventProcessor implements MessageProcessor<Configuration> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(OutboundEventProcessor.class);
-
-    private DozerBeanMapper dozerBeanMapper;
-    private DataOrchestratorEventRepository repository;
-
-    private DRMSConnector drmsConnector;
-    private WorkflowServiceConnector workflowServiceConnector;
-
-    public OutboundEventProcessor(Configuration configuration, DataOrchestratorEventRepository repository) throws Exception {
-        this.repository = repository;
-        this.init(configuration);
-    }
-
-    @Override
-    public void init(Configuration configuration) throws Exception {
-        this.drmsConnector = new DRMSConnector(configuration);
-        this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
-        dozerBeanMapper = new DozerBeanMapper();
-        BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
-            @Override
-            protected void configure() {
-                mapping(NotificationEvent.class, DataOrchestratorEntity.class);
-            }
-        };
-        dozerBeanMapper.addMapping(orchestratorEventMapper);
-
-    }
-
-    @Override
-    public void close() throws Exception {
-        this.drmsConnector.close();
-        this.workflowServiceConnector.close();
-    }
-
-
-    @Override
-    public void run() {
-
-        try {
-            List<DataOrchestratorEntity> orchestratorEntityList = this.repository
-                    .findAllEntitiesWithGivenStatus(EventStatus.DATA_ORCH_RECEIVED.name());
-            Map<String, List<DataOrchestratorEntity>> entityMap = new HashMap<>();
-            orchestratorEntityList.forEach(entity -> {
-                entityMap.computeIfAbsent(entity.getResourceId(), list -> new ArrayList()).add(entity);
-            });
-            entityMap.forEach((key, value) -> {
-                try {
-                    DataOrchestratorEntity entity = value.remove(0);
-                    processEvent(entity);
-                    value.forEach(val -> {
-                        val.setEventStatus(EventStatus.DATA_ORCH_PROCESSED_AND_SKIPPED.name());
-                        repository.save(val);
-                    });
-                } catch (Exception e) {
-                    LOGGER.error("Errored while processing event", e);
-                }
-            });
-        } catch (Exception ex) {
-            LOGGER.error("Error while processing events", ex);
-        }
-
-    }
-
-    private void processEvent(DataOrchestratorEntity entity) {
-        try {
-
-            // TODO move this logic to file listener as this is EMC specific
-            Optional<OwnershipEntity> adminOp = entity.getOwnershipEntities().stream().filter(o -> o.getPermissionId().equals("ADMIN")).findFirst();
-            if (adminOp.isEmpty()) {
-                throw new Exception("No admin user found");
-            }
-
-            String resourcePath = entity.getResourcePath();
-            String tail = resourcePath.substring(resourcePath.indexOf(adminOp.get().getUserId()));
-
-            String[] collections = tail.split("/");
-
-            Optional<TransferMapping> optionalStorPref = drmsConnector.getActiveTransferMapping(entity, entity.getHostName());
-            if (optionalStorPref.isEmpty()) {
-                entity.setEventStatus(EventStatus.ERRORED.name());
-                entity.setError("Storage not found for host: " + entity.getHostName());
-                repository.save(entity);
-                return;
-            }
-
-            TransferMapping transferMapping = optionalStorPref.get();
-            String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
-            String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
-            String parentType = "Storage";
-
-            String parentId = sourceStorageId;
-            for (int i = 0; i < collections.length - 1; i++) {
-                String resourceName = collections[i];
-                String path = entity.getResourcePath().substring(0, entity.getResourcePath().indexOf(resourceName));
-                path = path.concat(resourceName);
-                String entityId = Utils.getId(path);
-                Optional<GenericResource> optionalGenericResource =
-                        this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, parentId, "COLLECTION", parentType);
-                if (optionalGenericResource.isPresent()) {
-                    parentId = optionalGenericResource.get().getResourceId();
-                    parentType = "COLLECTION";
-                } else {
-                    entity.setEventStatus(EventStatus.ERRORED.name());
-                    entity.setError("Collection structure creation failed: " + entity.getHostName());
-                    repository.save(entity);
-                    return;
-                }
-            }
-
-            Optional<GenericResource> optionalGenericResource =
-                    this.drmsConnector.createResource(repository, entity, entity.getResourceId(),
-                            collections[collections.length - 1], entity.getResourcePath(),
-                            parentId, "FILE", parentType);
-
-            String destinationResourceId = destinationStorageId + ":" + entity.getResourcePath() + ":" + entity.getResourceType();
-            String messageId = Utils.getId(destinationResourceId);
-
-            Optional<GenericResource> destinationFile = this.drmsConnector.createResource(repository, entity, messageId,
-                    entity.getResourceName(),
-                    entity.getResourcePath(),
-                    destinationStorageId,
-                    "FILE", "Storage");
-
-            if (optionalGenericResource.isPresent() && destinationFile.isPresent()) {
-                try {
-
-                    Optional<AnyStoragePreference> storagePreferenceOptional = this.drmsConnector
-                            .getStoragePreference(entity.getAuthToken(), adminOp.get().getUserId(), entity.getTenantId(), sourceStorageId);
-
-                    Optional<AnyStoragePreference> destinationPreferenceOptional = this.drmsConnector
-                            .getStoragePreference(entity.getAuthToken(), adminOp.get().getUserId(), entity.getTenantId(), destinationStorageId);
-                    if (storagePreferenceOptional.isPresent() && destinationPreferenceOptional.isPresent()) {
-                        String sourceCredentialToken = storagePreferenceOptional.get()
-                                .getSshStoragePreference()
-                                .getCredentialToken();
-                        String destinationCredentialToken = storagePreferenceOptional.get()
-                                .getSshStoragePreference()
-                                .getCredentialToken();
-
-                        this.workflowServiceConnector.invokeWorkflow(entity.getAuthToken(), adminOp.get().getUserId(),
-                                entity.getTenantId(), entity.getResourceId(), sourceCredentialToken,
-                                messageId, destinationCredentialToken);
-                        entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
-                        repository.save(entity);
-                    } else {
-                        String msg = "Cannot find storage preference for storage " + sourceStorageId + " for user " + adminOp.get().getUserId();
-                        entity.setError(msg);
-                        entity.setEventStatus(EventStatus.ERRORED.name());
-                        repository.save(entity);
-                        LOGGER.error(msg);
-                    }
-
-
-                } catch (Exception exception) {
-                    String msg = "Error occurred while invoking workflow manager" + exception.getMessage();
-                    entity.setError("Error occurred while invoking workflow manager " + exception.getMessage());
-                    entity.setEventStatus(EventStatus.ERRORED.name());
-                    repository.save(entity);
-                    LOGGER.error(msg, exception);
-                }
-            }
-        } catch (Exception exception) {
-            LOGGER.error("Error occurred while processing outbound data orchestrator event", exception);
-            entity.setEventStatus(EventStatus.ERRORED.name());
-            entity.setError("Error occurred while processing ");
-            repository.save(entity);
-        }
-    }
-
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
index df6edf7..ab077cc 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
@@ -60,6 +60,7 @@ message DataParsingJobOutput {
     string dataParserOutputInterfaceId = 2;
     string dataParsingJobId = 3;
     string outputType = 4;
+    string outputDirectoryResourceId = 5;
 }
 
 message DataParsingJob {
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
index af77f3e..7482c7a 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -90,115 +90,118 @@ public class DataParsingWorkflowManager {
     public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {
 
         WorkflowMessage workflowMessage = request.getMessage();
-        logger.info("Processing parsing workflow for resource {}", workflowMessage.getSourceResourceId());
-
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);
-
-        DelegateAuth delegateAuth = DelegateAuth.newBuilder()
-                .setUserId(workflowMessage.getUsername())
-                .setClientId(mftClientId)
-                .setClientSecret(mftClientSecret)
-                .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
-
-        FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
-                .setResourceType("SCP")
-                .setResourceId(workflowMessage.getSourceResourceId())
-                .setResourceToken(workflowMessage.getSourceCredentialToken())
-                .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
-
-        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
-        DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
-
-        ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
-
-        Map<String, StringMap> parserInputMappings = new HashMap<>();
-        List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
-            List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
-
-            boolean match = true;
-            StringMap stringMap = new StringMap();
-            for (DataParsingJobInput pji : pjis) {
-
-                ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
-                Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
-                bindings.put("polyglot.js.allowHostAccess", true);
-                bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
-                bindings.put("metadata", metadata);
-                try {
-                    Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
-                    stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
-                    match = match && eval;
-                } catch (ScriptException e) {
-                    logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
-                    match = false;
+
+        for (String sourceResourceId : workflowMessage.getSourceResourceIdsList()) {
+            logger.info("Processing parsing workflow for resource {}", sourceResourceId);
+
+            MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);
+
+            DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+                    .setUserId(workflowMessage.getUsername())
+                    .setClientId(mftClientId)
+                    .setClientSecret(mftClientSecret)
+                    .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
+
+            FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+                    .setResourceType("SCP")
+                    .setResourceId(sourceResourceId)
+                    .setResourceToken(workflowMessage.getSourceCredentialToken())
+                    .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+            ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+            DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
+
+            ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
+
+            Map<String, StringMap> parserInputMappings = new HashMap<>();
+            List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
+                List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
+
+                boolean match = true;
+                StringMap stringMap = new StringMap();
+                for (DataParsingJobInput pji : pjis) {
+
+                    ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
+                    Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
+                    bindings.put("polyglot.js.allowHostAccess", true);
+                    bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
+                    bindings.put("metadata", metadata);
+                    try {
+                        Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
+                        stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
+                        match = match && eval;
+                    } catch (ScriptException e) {
+                        logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
+                        match = false;
+                    }
                 }
-            }
 
-            if (match) {
-                parserInputMappings.put(pj.getParserId(), stringMap);
-            }
-            return match;
-        }).collect(Collectors.toList());
-
-        Map<String, AbstractTask> taskMap = new HashMap<>();
-
-        SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
-        downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
-        downloadTask.setMftClientId(mftClientId);
-        downloadTask.setMftClientSecret(mftClientSecret);
-        downloadTask.setUserId(workflowMessage.getUsername());
-        downloadTask.setTenantId(workflowMessage.getTenantId());
-        downloadTask.setMftHost(mftHost);
-        downloadTask.setMftPort(mftPort);
-        downloadTask.setSourceResourceId(workflowMessage.getSourceResourceId());
-        downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
-
-        taskMap.put(downloadTask.getTaskId(), downloadTask);
-
-        for(String parserId: parserInputMappings.keySet()) {
-
-            GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
-            dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
-            dataParsingTask.setParserId(parserId);
-            dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
-            taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
-
-            OutPort outPort = new OutPort();
-            outPort.setNextTaskId(dataParsingTask.getTaskId());
-            downloadTask.addOutPort(outPort);
-
-            DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
-            ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
-
-            for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
-
-                Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
-                        o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
-                        .findFirst();
-
-                if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
-                    MetadataPersistTask mpt = new MetadataPersistTask();
-                    mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
-                    mpt.setDrmsHost(drmsHost);
-                    mpt.setDrmsPort(drmsPort);
-                    mpt.setTenant(workflowMessage.getTenantId());
-                    mpt.setUser(workflowMessage.getUsername());
-                    mpt.setServiceAccountKey(mftClientId);
-                    mpt.setServiceAccountSecret(mftClientSecret);
-                    mpt.setResourceId(workflowMessage.getSourceResourceId());
-                    mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
-                    OutPort dpOut = new OutPort();
-                    dpOut.setNextTaskId(mpt.getTaskId());
-                    dataParsingTask.addOutPort(dpOut);
-                    taskMap.put(mpt.getTaskId(), mpt);
+                if (match) {
+                    parserInputMappings.put(pj.getParserId(), stringMap);
+                }
+                return match;
+            }).collect(Collectors.toList());
+
+            Map<String, AbstractTask> taskMap = new HashMap<>();
+
+            SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
+            downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
+            downloadTask.setMftClientId(mftClientId);
+            downloadTask.setMftClientSecret(mftClientSecret);
+            downloadTask.setUserId(workflowMessage.getUsername());
+            downloadTask.setTenantId(workflowMessage.getTenantId());
+            downloadTask.setMftHost(mftHost);
+            downloadTask.setMftPort(mftPort);
+            downloadTask.setSourceResourceId(sourceResourceId);
+            downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+
+            taskMap.put(downloadTask.getTaskId(), downloadTask);
+
+            for(String parserId: parserInputMappings.keySet()) {
+
+                GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
+                dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
+                dataParsingTask.setParserId(parserId);
+                dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
+                taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
+
+                OutPort outPort = new OutPort();
+                outPort.setNextTaskId(dataParsingTask.getTaskId());
+                downloadTask.addOutPort(outPort);
+
+                DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
+                ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
+
+                for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
+
+                    Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
+                            o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
+                            .findFirst();
+
+                    if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
+                        MetadataPersistTask mpt = new MetadataPersistTask();
+                        mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
+                        mpt.setDrmsHost(drmsHost);
+                        mpt.setDrmsPort(drmsPort);
+                        mpt.setTenant(workflowMessage.getTenantId());
+                        mpt.setUser(workflowMessage.getUsername());
+                        mpt.setServiceAccountKey(mftClientId);
+                        mpt.setServiceAccountSecret(mftClientSecret);
+                        mpt.setResourceId(sourceResourceId);
+                        mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
+                        OutPort dpOut = new OutPort();
+                        dpOut.setNextTaskId(mpt.getTaskId());
+                        dataParsingTask.addOutPort(dpOut);
+                        taskMap.put(mpt.getTaskId(), mpt);
+                    }
                 }
-            }
 
-        }
+            }
 
-        String[] startTaskIds = {downloadTask.getTaskId()};
-        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
+            String[] startTaskIds = {downloadTask.getTaskId()};
+            String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
 
-        logger.info("Submitted workflow {} to parse resource {}", workflowId, workflowMessage.getSourceResourceId());
+            logger.info("Submitted workflow {} to parse resource {}", workflowId, sourceResourceId);
+        }
     }
 }
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
index d12a365..ff0ee59 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
@@ -90,8 +90,6 @@ public class DataSyncWorkflowManager {
     @Value("${custos.secret}")
     private String custosSecret;
 
-
-
     @Value("${drms.host}")
     private String drmsHost;
 
@@ -214,7 +212,7 @@ public class DataSyncWorkflowManager {
 //        dt1.setMftCallbackStoreHost(datasyncWmHost);
 //        dt1.setMftCallbackStorePort(datasyncWmPort);
 
-        DataTransferPreValidationTask dt1 = new DataTransferPreValidationTask();
+        /*DataTransferPreValidationTask dt1 = new DataTransferPreValidationTask();
         dt1.setTenantId(request.getMessage().getTenantId());
         dt1.setCustosHost(custosHost);
         dt1.setCustosPort(custosPort);
@@ -239,6 +237,6 @@ public class DataSyncWorkflowManager {
         //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
         String[] startTaskIds = {dt1.getTaskId()};
         String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
-        logger.info("Launched workflow {}", workflowId);
+        logger.info("Launched workflow {}", workflowId);*/
     }
 }
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index 29be8a4..8128ff3 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 @GRpcService
 public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServiceImplBase {
@@ -43,13 +45,22 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
     @Autowired
     private DataParsingWorkflowManager dataParsingWorkflowManager;
 
+    private final ScheduledExecutorService workflowExecutorService = Executors.newSingleThreadScheduledExecutor();
+
     @Override
     public void invokeWorkflow(WorkflowInvocationRequest request,
                                StreamObserver<WorkflowInvocationResponse> responseObserver) {
         try {
-            logger.info("Invoking workflow executor for resource {}", request.getMessage().getSourceResourceId());
-            //dataSyncWorkflowManager.submitDataSyncWorkflow(request);
-            dataParsingWorkflowManager.submitDataParsingWorkflow(request);
+
+            logger.info("Invoking workflow executor");
+            workflowExecutorService.submit(() -> {
+                try {
+                    dataParsingWorkflowManager.submitDataParsingWorkflow(request);
+                } catch (Exception e) {
+                    logger.error("Failed to submit data parsing workflow for destination resource {}",
+                            request.getMessage().getDestinationResourceId());
+                }
+            });
             responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
             responseObserver.onCompleted();
         } catch (Exception ex) {
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index c1b5da3..532aff2 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -26,8 +26,7 @@ import "google/protobuf/empty.proto";
 
 message WorkflowMessage {
     string message_id = 1;
-    string resource_id = 2;
-    string source_resource_id = 3;
+    repeated string source_resource_ids = 3;
     string destination_resource_id = 4;
     string username = 5;
     string tenantId = 6;