You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/07 04:53:58 UTC

[airavata-data-lake] branch master updated: complete transfer mapping, storage source and destination detection, collection herachy creation from path

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new f2c2ac0  complete transfer mapping, storage source and destination detection, collection herachy creation from path
     new d48c786  Merge pull request #16 from isururanawaka/workflow_merge
f2c2ac0 is described below

commit f2c2ac0b293814de80afde9e91f6ebef700a8069
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Wed Jul 7 00:52:57 2021 -0400

    complete transfer mapping, storage source and destination detection, collection herachy creation from path
---
 .../file/client/model/Configuration.java           |  50 +++----
 .../file/client/watcher/FileWatcher.java           |   2 +-
 .../src/main/resources/application.properties      |   2 +-
 .../src/main/resources/config.yml                  |   2 +-
 .../messaging/model/NotificationEvent.java         |  20 +--
 .../model/NotificationEventDeserializer.java       |   2 +-
 .../model/NotificationEventSerializer.java         |   2 +-
 .../persistance/DataOrchestratorEntity.java        |  10 +-
 .../airavata/datalake/orchestrator/Utils.java      |  29 ++++
 .../orchestrator/connectors/DRMSConnector.java     | 159 +++++++++++++++++++++
 .../connectors/WorkflowServiceConnector.java       |  72 ++++++++++
 .../db/inmemory/DefaultInMemoryStore.java          |  74 ----------
 .../processor/InboundEventProcessor.java           |  33 +----
 .../processor/OutboundEventProcessor.java          | 146 +++++++------------
 .../datalake/orchestrator/core/AbstractTask.java   |  26 ----
 .../orchestrator/core/adaptors/StorageAdaptor.java |  17 ---
 .../core/connector/AbstractConnector.java          |  14 ++
 .../core/processor/MessageProcessor.java           |   4 +-
 .../drms/api/handlers/ResourceServiceHandler.java  |  17 +--
 .../drms-rest-proxy/src/main/resources/drms.pb     | Bin 105597 -> 109380 bytes
 .../src/main/proto/resource/DRMSResource.proto     |   1 +
 21 files changed, 381 insertions(+), 301 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
index 03aeba8..630db9c 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
@@ -2,7 +2,7 @@ package org.apache.airavata.dataorchestrator.file.client.model;
 
 public class Configuration {
     private String listeningPath;
-    private String storagePreferenceId;
+    private String hostName;
 
     private Producer producer;
 
@@ -11,12 +11,12 @@ public class Configuration {
     public Configuration() {
     }
 
-    public String getStoragePreferenceId() {
-        return storagePreferenceId;
+    public String getHostName() {
+        return hostName;
     }
 
-    public void setStoragePreferenceId(String storagePreferenceId) {
-        this.storagePreferenceId = storagePreferenceId;
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
     }
 
     public String getListeningPath() {
@@ -73,36 +73,36 @@ public class Configuration {
         }
     }
 
-   public static class Custos {
+    public static class Custos {
 
         private String serviceAccountId;
         private String serviceAccountSecret;
         private String tenantId;
 
-       public String getServiceAccountId() {
-           return serviceAccountId;
-       }
+        public String getServiceAccountId() {
+            return serviceAccountId;
+        }
 
-       public void setServiceAccountId(String serviceAccountId) {
-           this.serviceAccountId = serviceAccountId;
-       }
+        public void setServiceAccountId(String serviceAccountId) {
+            this.serviceAccountId = serviceAccountId;
+        }
 
-       public String getServiceAccountSecret() {
-           return serviceAccountSecret;
-       }
+        public String getServiceAccountSecret() {
+            return serviceAccountSecret;
+        }
 
-       public void setServiceAccountSecret(String serviceAccountSecret) {
-           this.serviceAccountSecret = serviceAccountSecret;
-       }
+        public void setServiceAccountSecret(String serviceAccountSecret) {
+            this.serviceAccountSecret = serviceAccountSecret;
+        }
 
-       public String getTenantId() {
-           return tenantId;
-       }
+        public String getTenantId() {
+            return tenantId;
+        }
 
-       public void setTenantId(String tenantId) {
-           this.tenantId = tenantId;
-       }
-   }
+        public void setTenantId(String tenantId) {
+            this.tenantId = tenantId;
+        }
+    }
 
 
 }
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
index d7870c2..9e16018 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
@@ -167,7 +167,7 @@ public class FileWatcher implements Runnable {
                 + ":" + configuration.getCustos().getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)));
         context.setBasePath(configuration.getListeningPath());
         context.setTenantId(configuration.getCustos().getTenantId());
-        context.setStoragePreferenceId(configuration.getStoragePreferenceId());
+        context.setHostName(configuration.getHostName());
         event.setContext(context);
         return event;
     }
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties
index 8c8bbf1..6e03154 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/application.properties
@@ -1 +1 @@
-config.path=/Users/dimuthu/code/airavata-data-lake/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
\ No newline at end of file
+config.path=/Users/isururanawaka/Documents/Airavata_Repository/airavata-data-lake/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
\ No newline at end of file
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
index 5c1d0ac..cae9b9f 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/resources/config.yml
@@ -1,5 +1,5 @@
 listeningPath: "/Users/isururanawaka/Documents/texts"
-storagePreferenceId:""
+hostName: "beta.iubemcenter.scigap.org"
 
 producer:
   brokerURL: "localhost:9092"
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
index fc4a09a..1cda9db 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
@@ -68,7 +68,7 @@ public class  NotificationEvent {
         private Long occuredTime;
         private String authToken;
         private String tenantId;
-        private String storagePreferenceId;
+        private String hostName;
         private String basePath;
 
 
@@ -104,14 +104,6 @@ public class  NotificationEvent {
             this.tenantId = tenantId;
         }
 
-        public String getStoragePreferenceId() {
-            return storagePreferenceId;
-        }
-
-        public void setStoragePreferenceId(String storagePreferenceId) {
-            this.storagePreferenceId = storagePreferenceId;
-        }
-
         public String getBasePath() {
             return basePath;
         }
@@ -119,10 +111,18 @@ public class  NotificationEvent {
         public void setBasePath(String basePath) {
             this.basePath = basePath;
         }
+
+        public String getHostName() {
+            return hostName;
+        }
+
+        public void setHostName(String hostName) {
+            this.hostName = hostName;
+        }
     }
 
     public String getResourceId() {
-        return context.storagePreferenceId + ":" + resourcePath + ":" + resourceType;
+        return context.hostName+ ":" + resourcePath + ":" + resourceType;
     }
 
 
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index 5fddbcd..f19d272 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -25,7 +25,7 @@ public class NotificationEventDeserializer implements Deserializer<NotificationE
         context.setOccuredTime(Long.valueOf(parts[2]));
         context.setAuthToken(String.valueOf(parts[3]));
         context.setTenantId(String.valueOf(parts[4]));
-        context.setStoragePreferenceId(parts[5]);
+        context.setHostName(parts[5]);
         context.setBasePath(parts[6]);
         event.setResourcePath(parts[7]);
         event.setResourceType(parts[8]);
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index e71ee83..7a55770 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -20,7 +20,7 @@ public class NotificationEventSerializer implements Serializer<NotificationEvent
                 notificationEvent.getContext().getOccuredTime() + "," +
                 notificationEvent.getContext().getAuthToken() + "," +
                 notificationEvent.getContext().getTenantId() + "," +
-                notificationEvent.getContext().getStoragePreferenceId()+ "," +
+                notificationEvent.getContext().getHostName() + "," +
                 notificationEvent.getContext().getBasePath() + "," +
                 notificationEvent.getResourcePath() + "," +
                 notificationEvent.getResourceType() + "," +
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java
index 3815f08..562690e 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/DataOrchestratorEntity.java
@@ -65,7 +65,7 @@ public class DataOrchestratorEntity {
     private String authToken;
 
     @Column(nullable = false)
-    private String storagePreferenceId;
+    private String hostName;
 
     @Lob
     private String error;
@@ -202,11 +202,11 @@ public class DataOrchestratorEntity {
         this.authToken = authToken;
     }
 
-    public String getStoragePreferenceId() {
-        return storagePreferenceId;
+    public String getHostName() {
+        return hostName;
     }
 
-    public void setStoragePreferenceId(String storagePreferenceId) {
-        this.storagePreferenceId = storagePreferenceId;
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
     }
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
new file mode 100644
index 0000000..420c042
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
@@ -0,0 +1,29 @@
+package org.apache.airavata.datalake.orchestrator;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class Utils {
+
+    public static String getId(String message) throws NoSuchAlgorithmException {
+        MessageDigest md = MessageDigest.getInstance("SHA-256");
+        // digest() method called
+        // to calculate message digest of an input
+        // and return array of byte
+        byte[] array = md.digest(message.getBytes(StandardCharsets.UTF_8));
+        // Convert byte array into signum representation
+        BigInteger number = new BigInteger(1, array);
+
+        // Convert message digest into hex value
+        StringBuilder hexString = new StringBuilder(number.toString(16));
+
+        // Pad with leading zeros
+        while (hexString.length() < 32) {
+            hexString.insert(0, '0');
+        }
+
+        return hexString.toString();
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
new file mode 100644
index 0000000..d76ffa6
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -0,0 +1,159 @@
+package org.apache.airavata.datalake.orchestrator.connectors;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * DRMS connector to connect with DRMS service
+ */
+public class DRMSConnector implements AbstractConnector<Configuration> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DRMSConnector.class);
+
+    private ManagedChannel drmsChannel;
+    private ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub;
+    private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub storagePreferenceServiceBlockingStub;
+
+    public DRMSConnector(Configuration configuration) throws Exception {
+        this.init(configuration);
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        this.drmsChannel = ManagedChannelBuilder
+                .forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
+                        configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
+        this.resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(drmsChannel);
+        this.storagePreferenceServiceBlockingStub = StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
+
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.drmsChannel.shutdown();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return !this.drmsChannel.isShutdown();
+    }
+
+    public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(entity.getAuthToken())
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(entity.getOwnerId())
+                        .setTenantId(entity.getTenantId())
+                        .build())
+                .build();
+        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .build();
+        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+        List<TransferMapping> transferMappingList = response.getMappingsList();
+        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+        if (!transferMappingList.isEmpty()) {
+            transferMappingList.forEach(transferMapping -> {
+                if (transferMapping.getSourceStoragePreference().getStorageCase()
+                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+                    if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
+                            .getStorage().getHostName().equals(hostname)) {
+                        storagePreferenceId
+                                .set(transferMapping.getSourceStoragePreference()
+                                        .getSshStoragePreference().getStoragePreferenceId());
+                    }
+                }
+            });
+        }
+        return Optional.ofNullable(storagePreferenceId.get());
+    }
+
+    public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(entity.getAuthToken())
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(entity.getOwnerId())
+                        .setTenantId(entity.getTenantId())
+                        .build())
+                .build();
+        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .build();
+        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+        List<TransferMapping> transferMappingList = response.getMappingsList();
+        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+        if (!transferMappingList.isEmpty()) {
+            transferMappingList.forEach(transferMapping -> {
+                if (transferMapping.getDestinationStoragePreference().getStorageCase()
+                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+                    if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
+                            .getStorage().getHostName().equals(hostname)) {
+                        storagePreferenceId
+                                .set(transferMapping.getDestinationStoragePreference()
+                                        .getSshStoragePreference().getStoragePreferenceId());
+                    }
+                }
+            });
+        }
+        return Optional.ofNullable(storagePreferenceId.get());
+    }
+
+
+    public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
+                                                    String resourceId,
+                                                    String resourceName,
+                                                    String resourcePath,
+                                                    String parentId,
+                                                    String type) {
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(entity.getAuthToken())
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(entity.getOwnerId())
+                        .setTenantId(entity.getTenantId())
+                        .build())
+                .build();
+
+        GenericResource genericResource = GenericResource
+                .newBuilder()
+                .setResourceId(resourceId)
+                .setResourceName(resourceName)
+                .setResourcePath(resourcePath)
+                .setType(type)
+                .setParentId(parentId).build();
+        ResourceCreateRequest resourceCreateRequest = ResourceCreateRequest
+                .newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .setResource(genericResource)
+                .build();
+
+        try {
+            ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
+            return Optional.ofNullable(resourceCreateResponse.getResource());
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
+            entity.setEventStatus(EventStatus.ERRORED.name());
+            entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
+            repository.save(entity);
+            return Optional.empty();
+        }
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
new file mode 100644
index 0000000..d3cbc5e
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -0,0 +1,72 @@
+package org.apache.airavata.datalake.orchestrator.connectors;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
+import org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Drms connector to call DRMS services
+ */
+
+public class WorkflowServiceConnector implements AbstractConnector<Configuration> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DRMSConnector.class);
+
+    private ManagedChannel workflowChannel;
+    private WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceStub;
+
+
+    public WorkflowServiceConnector(Configuration configuration) throws Exception {
+        this.init(configuration);
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        this.workflowChannel = ManagedChannelBuilder
+                .forAddress(configuration.getOutboundEventProcessor().getWorkflowEngineHost(),
+                        configuration.getOutboundEventProcessor().getWorkflowPort()).usePlaintext().build();
+        this.workflowServiceStub = WorkflowServiceGrpc.newBlockingStub(workflowChannel);
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    @Override
+    public boolean isOpen() {
+        return false;
+    }
+
+    public void invokeWorkflow(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity, GenericResource resource) {
+        try {
+            WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken
+                    .newBuilder()
+                    .setAccessToken("")
+                    .build();
+            WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
+                    .setResourceId(resource.getResourceId())
+                    .build();
+
+            WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
+                    .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
+            this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while invoking workflow engine", entity.getResourceId(), ex);
+            entity.setEventStatus(EventStatus.ERRORED.name());
+            entity.setError("Error occurred while invoking workflow engine" + ex.getMessage());
+            repository.save(entity);
+            return;
+        }
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/db/inmemory/DefaultInMemoryStore.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/db/inmemory/DefaultInMemoryStore.java
deleted file mode 100644
index 7630e47..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/db/inmemory/DefaultInMemoryStore.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.db.inmemory;
-
-import org.apache.airavata.datalake.orchestrator.core.adaptors.StorageAdaptor;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Stack;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Default in memory store that stores all transient events
- */
-public class DefaultInMemoryStore implements StorageAdaptor<NotificationEvent> {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultInMemoryStore.class);
-
-    private static final ConcurrentHashMap<String, Stack<NotificationEvent>> inmemoryStore = new ConcurrentHashMap();
-    private static final ConcurrentLinkedQueue<String> resourceIdQueue = new ConcurrentLinkedQueue<>();
-
-
-    @Override
-    public void save(NotificationEvent object) {
-        inmemoryStore.computeIfAbsent(object.getResourceId(), k -> new Stack<NotificationEvent>()).push(object);
-        resourceIdQueue.add(object.getResourceId());
-    }
-
-    @Override
-    public void delete(String id) {
-        inmemoryStore.computeIfPresent(id, (k, v) -> {
-            v.remove(id);
-            return v;
-        });
-        resourceIdQueue.remove(id);
-    }
-
-    @Override
-    public NotificationEvent get(String id) {
-        return inmemoryStore.get(id).get(0);
-    }
-
-    @Override
-    public NotificationEvent update(NotificationEvent object) throws UnsupportedOperationException {
-        throw new UnsupportedOperationException("Update events are not supported in default inmemory store");
-    }
-
-    @Override
-    public List<NotificationEvent> poll(int numOfEvents) {
-        List<NotificationEvent> notificationEventList = new ArrayList<>();
-        AtomicInteger count = new AtomicInteger(1);
-        int iterations = 0;
-        while (count.get() <= numOfEvents) {
-            String value = resourceIdQueue.poll();
-            List<NotificationEvent> finalNotificationEventList = notificationEventList;
-            Optional.ofNullable(value).ifPresent(val-> {
-                Stack events =  inmemoryStore.remove(val);
-                count.set(count.get() + events.size());
-                finalNotificationEventList.addAll(events);
-            });
-            iterations++;
-            if (iterations > 100) {
-                break;
-            }
-        }
-        return notificationEventList;
-    }
-
-
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
index 92fc28b..faddd61 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.datalake.orchestrator.processor;
 
 import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
 import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
@@ -11,9 +12,7 @@ import org.dozer.loader.api.BeanMappingBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Base64;
@@ -24,7 +23,7 @@ import java.util.regex.Pattern;
 /**
  * This class is responsible for pick events from kafka queue and publish them into inmemory store
  */
-public class InboundEventProcessor implements MessageProcessor {
+public class InboundEventProcessor implements MessageProcessor<Configuration> {
     private static final Logger LOGGER = LoggerFactory.getLogger(InboundEventProcessor.class);
     private Configuration configuration;
     private NotificationEvent notificationEvent;
@@ -34,15 +33,15 @@ public class InboundEventProcessor implements MessageProcessor {
 
     public InboundEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
                                  DataOrchestratorEventRepository repository) throws Exception {
-        this.configuration = configuration;
         this.notificationEvent = notificationEvent;
         this.repository = repository;
-        this.init();
+        this.init(configuration);
     }
 
     @Override
-    public void init() throws Exception {
+    public void init(Configuration configuration) throws Exception {
         try {
+            this.configuration = configuration;
             dozerBeanMapper = new DozerBeanMapper();
             BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
                 @Override
@@ -109,7 +108,7 @@ public class InboundEventProcessor implements MessageProcessor {
         entity.setEventStatus(EventStatus.DATA_ORCH_RECEIVED.name());
         entity.setEventType(event.getContext().getEvent().name());
         entity.setAuthToken(event.getContext().getAuthToken());
-        entity.setStoragePreferenceId(event.getContext().getStoragePreferenceId());
+        entity.setHostName(event.getContext().getHostName());
 
         String resourcePath = event.getResourcePath();
         String basePath = event.getContext().getBasePath();
@@ -124,27 +123,9 @@ public class InboundEventProcessor implements MessageProcessor {
                 .decode(event.getContext().getAuthToken().getBytes(StandardCharsets.UTF_8)));
         String agentId = authDecoded.split(":")[0];
         entity.setAgentId(agentId);
-        entity.setResourceId(getResourceId(event.getResourceId()));
+        entity.setResourceId(Utils.getId(event.getResourceId()));
         return entity;
     }
 
-    private String getResourceId(String message) throws NoSuchAlgorithmException {
-        MessageDigest md = MessageDigest.getInstance("SHA-256");
-        // digest() method called
-        // to calculate message digest of an input
-        // and return array of byte
-        byte[] array = md.digest(message.getBytes(StandardCharsets.UTF_8));
-        // Convert byte array into signum representation
-        BigInteger number = new BigInteger(1, array);
-
-        // Convert message digest into hex value
-        StringBuilder hexString = new StringBuilder(number.toString(16));
-
-        // Pad with leading zeros
-        while (hexString.length() < 32) {
-            hexString.insert(0, '0');
-        }
 
-        return hexString.toString();
-    }
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 7c9e7cc..8038061 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,64 +1,44 @@
 package org.apache.airavata.datalake.orchestrator.processor;
 
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import org.apache.airavata.datalake.drms.AuthCredentialType;
-import org.apache.airavata.datalake.drms.AuthenticatedUser;
-import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
 import org.apache.airavata.datalake.drms.resource.GenericResource;
-import org.apache.airavata.datalake.drms.storage.*;
 import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
+import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
 import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
-import org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
 import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
 import org.dozer.DozerBeanMapper;
 import org.dozer.loader.api.BeanMappingBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * This class is responsible  and publish events to registry and
  * Workflow engine
  */
-public class OutboundEventProcessor implements MessageProcessor {
+public class OutboundEventProcessor implements MessageProcessor<Configuration> {
     private static final Logger LOGGER = LoggerFactory.getLogger(OutboundEventProcessor.class);
 
     private DozerBeanMapper dozerBeanMapper;
     private DataOrchestratorEventRepository repository;
 
-    private final ManagedChannel workflowChannel;
-    private final ManagedChannel drmsChannel;
-    private final WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceStub;
-    private final ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub;
-    private final StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub storagePreferenceServiceBlockingStub;
+    private DRMSConnector drmsConnector;
+    private WorkflowServiceConnector workflowServiceConnector;
 
     public OutboundEventProcessor(Configuration configuration, DataOrchestratorEventRepository repository) throws Exception {
         this.repository = repository;
-        //convert these to SSL
-        this.workflowChannel = ManagedChannelBuilder
-                .forAddress(configuration.getOutboundEventProcessor().getWorkflowEngineHost(),
-                        configuration.getOutboundEventProcessor().getWorkflowPort()).usePlaintext().build();
-        this.drmsChannel = ManagedChannelBuilder.forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
-                configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
-        this.workflowServiceStub = WorkflowServiceGrpc.newBlockingStub(workflowChannel);
-        this.resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(drmsChannel);
-        this.storagePreferenceServiceBlockingStub = StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
-        this.init();
+        this.init(configuration);
     }
 
     @Override
-    public void init() throws Exception {
+    public void init(Configuration configuration) throws Exception {
+        this.drmsConnector = new DRMSConnector(configuration);
+        this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
         dozerBeanMapper = new DozerBeanMapper();
         BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
             @Override
@@ -72,8 +52,8 @@ public class OutboundEventProcessor implements MessageProcessor {
 
     @Override
     public void close() throws Exception {
-        this.workflowChannel.shutdown();
-        this.drmsChannel.shutdown();
+        this.drmsConnector.close();
+        this.workflowServiceConnector.close();
     }
 
 
@@ -103,84 +83,54 @@ public class OutboundEventProcessor implements MessageProcessor {
 
     private void processEvent(DataOrchestratorEntity entity) {
         try {
-            DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-                    .setAccessToken(entity.getAuthToken())
-                    .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-                    .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                            .setUsername(entity.getOwnerId())
-                            .setTenantId(entity.getTenantId())
-                            .build())
-                    .build();
-
-            StoragePreferenceFetchRequest storagePreferenceFetchRequest = StoragePreferenceFetchRequest.newBuilder()
-                    .setStoragePreferenceId(entity.getStoragePreferenceId()).setAuthToken(serviceAuthToken).build();
-
-            StoragePreferenceFetchResponse response = storagePreferenceServiceBlockingStub
-                    .fetchStoragePreference(storagePreferenceFetchRequest);
-
-            if (!response.hasStoragePreference()) {
+
+            String ownerId = entity.getOwnerId();
+            String resourcePath = entity.getResourcePath();
+            String tail = resourcePath.substring(resourcePath.indexOf(ownerId));
+            String[] collections = tail.split("/");
+
+            Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+            if (optionalStorPref.isEmpty()) {
                 entity.setEventStatus(EventStatus.ERRORED.name());
-                entity.setError("StoragePreference not found ");
+                entity.setError("StoragePreference not found for host: " + entity.getHostName());
                 repository.save(entity);
                 return;
             }
 
-            //currently only ssh is working
-            if (response.getStoragePreference().getSshStoragePreference().isInitialized()) {
-                GenericResource genericResource = GenericResource
-                        .newBuilder()
-                        .setResourceId(entity.getResourceId())
-                        .setResourceName(entity.getResourceName())
-                        .setResourcePath(entity.getResourcePath())
-                        .setType("FILE")
-                        .setSshPreference(response.getStoragePreference().getSshStoragePreference()).build();
-                ResourceCreateRequest resourceCreateRequest = ResourceCreateRequest
-                        .newBuilder()
-                        .setAuthToken(serviceAuthToken)
-                        .setResource(genericResource)
-                        .build();
-                GenericResource resource = null;
-                try {
-                    ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
-                    resource = resourceCreateResponse.getResource();
-                } catch (Exception ex) {
-                    LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
-                    entity.setEventStatus(EventStatus.ERRORED.name());
-                    entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
-                    repository.save(entity);
-                    return;
-                }
+            String parentId = optionalStorPref.get();
+
+            for (int i = 1; i < collections.length - 1; i++) {
+                String resourceName = collections[i];
+                String entityId = Utils.getId(resourcePath.substring(resourcePath.indexOf(resourceName)));
+                String path = entity.getResourcePath().substring(0, entity.getResourcePath().indexOf(resourceName));
+                Optional<GenericResource> optionalGenericResource =
+                        this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, parentId, "COLLECTION");
+                if (optionalGenericResource.isPresent()) {
+                    parentId = optionalGenericResource.get().getResourceId();
 
-                try {
-                    WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken
-                            .newBuilder()
-                            .setAccessToken("")
-                            .build();
-                    WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
-                            .setResourceId(resource.getResourceId())
-                            .build();
-
-                    WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
-                            .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
-                    this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
-                } catch (Exception ex) {
-                    LOGGER.error("Error occurred while invoking workflow engine", entity.getResourceId(), ex);
+                } else {
                     entity.setEventStatus(EventStatus.ERRORED.name());
-                    entity.setError("Error occurred while invoking workflow engine" + ex.getMessage());
+                    entity.setError("Collection structure creation failed: " + entity.getHostName());
                     repository.save(entity);
                     return;
                 }
-            } else {
-                LOGGER.error("Incorrect storage preference  {}", entity.getStoragePreferenceId());
-                entity.setEventStatus(EventStatus.ERRORED.name());
-                entity.setError("Incorrect storage preference " + entity.getStoragePreferenceId());
+            }
+
+            Optional<GenericResource> optionalGenericResource =
+                    this.drmsConnector.createResource(repository, entity, entity.getResourceId(),
+                            collections[collections.length - 1], entity.getResourcePath(),
+                            parentId, "FILE");
+
+
+            if (optionalGenericResource.isPresent()) {
+                this.workflowServiceConnector.invokeWorkflow(repository, entity, optionalGenericResource.get());
+                entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
                 repository.save(entity);
-                return;
+            } else {
+
             }
-            entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
-            repository.save(entity);
         } catch (Exception exception) {
-            LOGGER.error("Error occurred while processing outbound data orcehstrator event", exception);
+            LOGGER.error("Error occurred while processing outbound data orchestrator event", exception);
             entity.setEventStatus(EventStatus.ERRORED.name());
             entity.setError("Error occurred while processing ");
             repository.save(entity);
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/AbstractTask.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/AbstractTask.java
deleted file mode 100644
index 91f6e59..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/AbstractTask.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.datalake.orchestrator.core;
-
-/**
- * TODO: Task framework implementation
- */
-public abstract class AbstractTask {
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/adaptors/StorageAdaptor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/adaptors/StorageAdaptor.java
deleted file mode 100644
index aab4c32..0000000
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/adaptors/StorageAdaptor.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.core.adaptors;
-
-import java.util.List;
-
-public interface StorageAdaptor<T> {
-
-    void save(T object) throws UnsupportedOperationException;
-
-    void delete(String id) throws UnsupportedOperationException;
-
-    T get(String id) throws UnsupportedOperationException;
-
-    T update(T object) throws UnsupportedOperationException;
-
-    List<T> poll(int numOfEvents) throws UnsupportedOperationException;
-
-}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/connector/AbstractConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/connector/AbstractConnector.java
new file mode 100644
index 0000000..f206255
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/connector/AbstractConnector.java
@@ -0,0 +1,14 @@
+package org.apache.airavata.datalake.orchestrator.core.connector;
+
+/**
+ * Interface to implement external connectors
+ */
+public interface AbstractConnector<T> {
+
+    void init(T configuration) throws Exception;
+
+    void close() throws Exception;
+
+    boolean isOpen();
+
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java
index 215b109..df63877 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-service-core/src/main/java/org/apache/airavata/datalake/orchestrator/core/processor/MessageProcessor.java
@@ -1,8 +1,8 @@
 package org.apache.airavata.datalake.orchestrator.core.processor;
 
-public interface MessageProcessor extends Runnable{
+public interface MessageProcessor<T> extends Runnable{
 
-     void init() throws Exception;
+     void init(T configuration) throws Exception;
 
       void close() throws Exception;
 }
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 750b885..9201ead 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -120,21 +120,12 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
             userProps.put("username", callUser.getUsername());
             userProps.put("tenantId", callUser.getTenantId());
 
-
-            String storagePreferenceId = "";
-
-            if (request.getResource().getStoragePreferenceCase()
-                    .equals(GenericResource.StoragePreferenceCase.S3_PREFERENCE)) {
-                storagePreferenceId = request.getResource().getS3Preference().getStoragePreferenceId();
-            } else if (request.getResource().getStoragePreferenceCase()
-                    .equals(GenericResource.StoragePreferenceCase.SSH_PREFERENCE)) {
-                storagePreferenceId = request.getResource().getSshPreference().getStoragePreferenceId();
-            }
+            String parentId = request.getResource().getParentId();
 
             String entityId = request.getResource().getResourceId();
             Map<String, Object> serializedMap = GenericResourceSerializer.serializeToMap(request.getResource());
             Optional<Entity> exEntity = CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
-                    storagePreferenceId, type, entityId,
+                    parentId, type, entityId,
                     request.getResource().getResourceName(), request.getResource().getResourceName(),
                     callUser.getUsername());
 
@@ -148,10 +139,10 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
                 serializedMap.put("lastModifiedTime", exEntity.get().getCreatedAt());
                 serializedMap.put("owner", exEntity.get().getOwnerId());
 
-                if (!storagePreferenceId.isEmpty()) {
+                if (!parentId.isEmpty()) {
                     this.neo4JConnector.mergeNodesWithParentChildRelationShip(serializedMap, new HashMap<>(),
                             request.getResource().getType(), StoragePreferenceConstants.STORAGE_PREFERENCE_LABEL,
-                            callUser.getUsername(), entityId, storagePreferenceId, callUser.getTenantId());
+                            callUser.getUsername(), entityId, parentId, callUser.getTenantId());
                 } else {
                     this.neo4JConnector.mergeNode(serializedMap, request.getResource().getType(),
                             callUser.getUsername(), entityId, callUser.getTenantId());
diff --git a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 0daacb7..fd2c03a 100644
Binary files a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb and b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb differ
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
index 6a29891..db7741a 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
@@ -35,4 +35,5 @@ message GenericResource {
   string type=6;
   string parent_resource_path = 7;
   string resource_name = 8;
+  string parent_id = 9;
 }
\ No newline at end of file