You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/08/07 21:12:40 UTC

[airavata-data-lake] branch master updated: Integrating data parsing workflow

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new cdbea12  Integrating data parsing workflow
cdbea12 is described below

commit cdbea1266872de8c83d0e592750b1a2fc77e2675
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Aug 7 17:12:26 2021 -0400

    Integrating data parsing workflow
---
 ansible/roles/mft/tasks/main.yml                   |   1 +
 .../mft/templates/secret-service/secrets.json.j2   |   2 +-
 .../src/main/dist/bin/orch-api-server-daemon.sh    |   4 +-
 .../src/main/dist/bin/orch-api-server.sh           |   2 +-
 ...ializer.java => DataOrchestratorAPIRunner.java} |   6 +-
 .../handlers/grpc/DataParserApiHandler.java        |  48 +++--
 .../src/main/proto/parsing.proto                   |  12 ++
 .../workflow/engine/wm/datasync/AppConfig.java     |   5 +
 .../wm/datasync/DataParsingWorkflowManager.java    | 202 +++++++++++++++++
 .../wm/datasync/WorkflowEngineAPIHandler.java      |   7 +-
 .../{APIRunner.java => WorkflowManagerRunner.java} |   6 +-
 .../workflow-engine/workflow-engine-core/pom.xml   |  30 +++
 .../workflow/engine/task/AbstractTask.java         |  20 +-
 .../workflow/engine/task/BlockingTask.java         |   6 +-
 .../workflow/engine/task/NonBlockingTask.java      |   4 +-
 .../workflow/engine/task/TaskUtil.java             |  17 --
 .../engine/task/impl/GenericDataParsingTask.java   | 240 +++++++++++++++++++++
 .../engine/task/impl/MetadataPersistTask.java      | 173 +++++++++++++++
 .../task/impl/SyncLocalDataDownloadTask.java       | 193 +++++++++++++++++
 .../{BlockingTask.java => types/StringMap.java}    |  25 +--
 .../workflow/engine/wm/WorkflowOperator.java       |   4 +-
 .../src/main/resources/task-list.yaml              |   3 +
 data-resource-management-service/drms-api/pom.xml  |   6 +
 .../drms/api/handlers/ResourceServiceHandler.java  |   3 +-
 .../handlers/StoragePreferenceServiceHandler.java  |   2 +
 pom.xml                                            |   9 +-
 26 files changed, 953 insertions(+), 77 deletions(-)

diff --git a/ansible/roles/mft/tasks/main.yml b/ansible/roles/mft/tasks/main.yml
index f47ceb0..dc6e243 100644
--- a/ansible/roles/mft/tasks/main.yml
+++ b/ansible/roles/mft/tasks/main.yml
@@ -18,6 +18,7 @@
   firewalld: port="{{ item }}/tcp"
     zone=public permanent=true state=enabled immediate=yes
   with_items:
+    - "{{ mft_api_service_grpc_port }}"
     - "{{ mft_default_agent_port }}"
     - "{{ mft_consul_port }}"
     - "{{ mft_resource_service_grpc_port }}"
diff --git a/ansible/roles/mft/templates/secret-service/secrets.json.j2 b/ansible/roles/mft/templates/secret-service/secrets.json.j2
index 30dcd0e..bfa8b53 100644
--- a/ansible/roles/mft/templates/secret-service/secrets.json.j2
+++ b/ansible/roles/mft/templates/secret-service/secrets.json.j2
@@ -1,7 +1,7 @@
 [
   {
       "type": "SCP",
-      "secretId": "ssh_storage_preference_emc_source",
+      "secretId": "ed1af924-2a91-412c-bedf-d9321252456d",
       "user": "ubuntu",
       "privateKey": "{{ vault_mft_agent_default_ssh_private_key }}",
       "publicKey": "{{ vault_mft_agent_default_ssh_public_key }}",
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh
index 81d17fe..70a4333 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh
@@ -41,7 +41,7 @@ case $1 in
         echo "Starting $SERVICE_NAME ..."
         if [ ! -f $PID_PATH_NAME ]; then
             nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
-            org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+            org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
             echo $! > $PID_PATH_NAME
             echo "$SERVICE_NAME started ..."
         else
@@ -90,7 +90,7 @@ case $1 in
             rm $PID_PATH_NAME
             echo "$SERVICE_NAME starting ..."
             nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
-            org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+            org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
             echo $! > $PID_PATH_NAME
             echo "$SERVICE_NAME started ..."
         else
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh
index d146413..b1d0221 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh
@@ -67,5 +67,5 @@ do
 done
 
 java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
-    org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $*
+    org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $*
 
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/APIServerInitializer.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
similarity index 94%
rename from data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/APIServerInitializer.java
rename to data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
index e687e1f..7fcefc3 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/APIServerInitializer.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
@@ -43,8 +43,8 @@ import java.io.InputStream;
 @EnableJpaAuditing
 @EnableJpaRepositories("org.apache.airavata.datalake")
 @EntityScan("org.apache.airavata.datalake")
-public class APIServerInitializer implements CommandLineRunner {
-    private static final Logger LOGGER = LoggerFactory.getLogger(APIServerInitializer.class);
+public class DataOrchestratorAPIRunner implements CommandLineRunner {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DataOrchestratorAPIRunner.class);
 
     @Autowired
     private OrchestratorEventHandler orchestratorEventHandler;
@@ -53,7 +53,7 @@ public class APIServerInitializer implements CommandLineRunner {
     private String configPath;
 
     public static void main(String[] args) {
-        SpringApplication.run(APIServerInitializer.class, args);
+        SpringApplication.run(DataOrchestratorAPIRunner.class, args);
     }
 
     @Override
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java
index 425cc96..30d2dd1 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java
@@ -28,6 +28,7 @@ import org.lognet.springboot.grpc.GRpcService;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.List;
+import java.util.Optional;
 
 @GRpcService
 public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServiceImplBase {
@@ -50,26 +51,13 @@ public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServic
 
     @Override
     public void listParsers(ParserListRequest request, StreamObserver<ParserListResponse> responseObserver) {
-        DozerBeanMapper mapper = new DozerBeanMapper();
 
         ParserListResponse.Builder response = ParserListResponse.newBuilder();
 
         List<DataParserEntity> allParsers = parserRepo.findAll();
         allParsers.forEach(dataParserEntity -> {
-            DataParser.Builder parserBuilder = DataParser.newBuilder();
-            mapper.map(dataParserEntity, parserBuilder);
-            dataParserEntity.getInputInterfacesList().forEach(dataParserInputInterfaceEntity -> {
-                DataParserInputInterface.Builder inputBuilder = DataParserInputInterface.newBuilder();
-                mapper.map(dataParserInputInterfaceEntity, inputBuilder);
-                parserBuilder.addInputInterfaces(inputBuilder);
-            });
 
-            dataParserEntity.getOutputInterfacesList().forEach(dataParserOutputInterfaceEntity -> {
-                DataParserOutputInterface.Builder outputBuilder = DataParserOutputInterface.newBuilder();
-                mapper.map(dataParserOutputInterfaceEntity, outputBuilder);
-                parserBuilder.addOutputInterfaces(outputBuilder);
-            });
-            response.addParsers(parserBuilder);
+            response.addParsers(mapParser(dataParserEntity));
         });
 
         responseObserver.onNext(response.build());
@@ -77,6 +65,38 @@ public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServic
     }
 
     @Override
+    public void fetchParser(ParserFetchRequest request, StreamObserver<ParserFetchResponse> responseObserver) {
+        Optional<DataParserEntity> entityOp = this.parserRepo.findById(request.getParserId());
+        if (entityOp.isPresent()) {
+            responseObserver.onNext(ParserFetchResponse.newBuilder().setParser(mapParser(entityOp.get())).build());
+            responseObserver.onCompleted();
+
+        } else {
+            responseObserver.onError(new Exception("Couldn't find a parser with id " + request.getParserId()));
+        }
+    }
+
+    private DataParser.Builder mapParser(DataParserEntity dataParserEntity) {
+        DozerBeanMapper mapper = new DozerBeanMapper();
+
+        DataParser.Builder parserBuilder = DataParser.newBuilder();
+        mapper.map(dataParserEntity, parserBuilder);
+        dataParserEntity.getInputInterfacesList().forEach(dataParserInputInterfaceEntity -> {
+            DataParserInputInterface.Builder inputBuilder = DataParserInputInterface.newBuilder();
+            mapper.map(dataParserInputInterfaceEntity, inputBuilder);
+            parserBuilder.addInputInterfaces(inputBuilder);
+        });
+
+        dataParserEntity.getOutputInterfacesList().forEach(dataParserOutputInterfaceEntity -> {
+            DataParserOutputInterface.Builder outputBuilder = DataParserOutputInterface.newBuilder();
+            mapper.map(dataParserOutputInterfaceEntity, outputBuilder);
+            parserBuilder.addOutputInterfaces(outputBuilder);
+        });
+
+        return parserBuilder;
+    }
+
+    @Override
     public void registerParsingJob(ParsingJobRegisterRequest request, StreamObserver<ParsingJobRegisterResponse> responseObserver) {
         DozerBeanMapper mapper = new DozerBeanMapper();
         DataParsingJobEntity entity = mapper.map(request.getParsingJob(), DataParsingJobEntity.class);
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
index 635e261..df6edf7 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
@@ -77,6 +77,14 @@ message ParserRegisterResponse {
     string parserId = 1;
 }
 
+message ParserFetchRequest {
+    string parserId = 1;
+}
+
+message ParserFetchResponse {
+    DataParser parser = 1;
+}
+
 message ParserListRequest {
 
 }
@@ -103,6 +111,10 @@ message ParsingJobListResponse {
 
 
 service DataParserService {
+
+    rpc fetchParser (ParserFetchRequest) returns (ParserFetchResponse) {
+    }
+
     rpc registerParser (ParserRegisterRequest) returns (ParserRegisterResponse) {
     }
 
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java
index b91922a..f7c976f 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java
@@ -33,6 +33,11 @@ public class AppConfig {
         return new DataSyncWorkflowManager();
     }
 
+    @Bean(initMethod = "init")
+    public DataParsingWorkflowManager dataParsingWorkflowManager() {
+        return new DataParsingWorkflowManager();
+    }
+
     @Bean
     public CallbackWorkflowStore callbackWorkflowStore() {
         return new CallbackWorkflowStore();
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
new file mode 100644
index 0000000..75eff4c
--- /dev/null
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.workflow.engine.wm.datasync;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.GenericDataParsingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.MetadataPersistTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.SyncLocalDataDownloadTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.wm.WorkflowOperator;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
+import org.apache.airavata.mft.api.service.FileMetadataResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.script.*;
+import java.util.*;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class DataParsingWorkflowManager {
+    private final static Logger logger = LoggerFactory.getLogger(DataParsingWorkflowManager.class);
+
+    @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+    private String clusterName;
+
+    @org.springframework.beans.factory.annotation.Value("${parsing.wm.name}")
+    private String workflowManagerName;
+
+    @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+    private String zkAddress;
+
+    @org.springframework.beans.factory.annotation.Value("${mft.host}")
+    private String mftHost;
+
+    @org.springframework.beans.factory.annotation.Value("${mft.port}")
+    private int mftPort;
+
+    @org.springframework.beans.factory.annotation.Value("${drms.host}")
+    private String drmsHost;
+
+    @org.springframework.beans.factory.annotation.Value("${drms.port}")
+    private int drmsPort;
+
+    private String mftClientId = "mft-agent";
+
+    private String mftClientSecret = "kHqH27BloDCbLvwUA8ZYRlHcJxXZyby9PB90bTdU";
+
+
+    @Autowired
+    private CallbackWorkflowStore callbackWorkflowStore;
+
+    private WorkflowOperator workflowOperator;
+
+    public void init() throws Exception {
+        workflowOperator = new WorkflowOperator();
+        workflowOperator.init(clusterName, workflowManagerName, zkAddress, callbackWorkflowStore);
+        logger.info("Successfully initialized Data Parsing Workflow Manager");
+    }
+
+    public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {
+
+        WorkflowMessage workflowMessage = request.getMessage();
+        logger.info("Processing parsing workflow for resource {}", workflowMessage.getSourceResourceId());
+
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);
+
+        DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+                .setUserId(workflowMessage.getUsername())
+                .setClientId(mftClientId)
+                .setClientSecret(mftClientSecret)
+                .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
+
+        FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+                .setResourceType("SCP")
+                .setResourceId(workflowMessage.getSourceResourceId())
+                .setResourceToken(workflowMessage.getSourceCredentialToken())
+                .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+        DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
+
+        ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
+
+        Map<String, StringMap> parserInputMappings = new HashMap<>();
+        List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
+            List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
+
+            boolean match = true;
+            StringMap stringMap = new StringMap();
+            for (DataParsingJobInput pji : pjis) {
+
+                ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
+                Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
+                bindings.put("polyglot.js.allowHostAccess", true);
+                bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
+                bindings.put("metadata", metadata);
+                try {
+                    Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
+                    stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
+                    match = match && eval;
+                } catch (ScriptException e) {
+                    logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
+                    match = false;
+                }
+            }
+
+            if (match) {
+                parserInputMappings.put(pj.getParserId(), stringMap);
+            }
+            return match;
+        }).collect(Collectors.toList());
+
+        Map<String, AbstractTask> taskMap = new HashMap<>();
+
+        SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
+        downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
+        downloadTask.setMftClientId(mftClientId);
+        downloadTask.setMftClientSecret(mftClientSecret);
+        downloadTask.setUserId(workflowMessage.getUsername());
+        downloadTask.setTenantId(workflowMessage.getTenantId());
+        downloadTask.setMftHost(mftHost);
+        downloadTask.setMftPort(mftPort);
+        downloadTask.setSourceResourceId(workflowMessage.getSourceResourceId());
+        downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+
+        taskMap.put(downloadTask.getTaskId(), downloadTask);
+
+        for(String parserId: parserInputMappings.keySet()) {
+
+            GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
+            dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
+            dataParsingTask.setParserId(parserId);
+            dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
+            taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
+
+            OutPort outPort = new OutPort();
+            outPort.setNextTaskId(dataParsingTask.getTaskId());
+            downloadTask.addOutPort(outPort);
+
+            DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
+            ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
+
+            for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
+
+                Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
+                        o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
+                        .findFirst();
+
+                if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
+                    MetadataPersistTask mpt = new MetadataPersistTask();
+                    mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
+                    mpt.setDrmsHost(drmsHost);
+                    mpt.setDrmsPort(drmsPort);
+                    mpt.setTenant(workflowMessage.getTenantId());
+                    mpt.setUser(workflowMessage.getUsername());
+                    mpt.setServiceAccountKey(mftClientId);
+                    mpt.setServiceAccountSecret(mftClientSecret);
+                    mpt.setResourceId(workflowMessage.getSourceResourceId());
+                    mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
+                    OutPort dpOut = new OutPort();
+                    dpOut.setNextTaskId(mpt.getTaskId());
+                    dataParsingTask.addOutPort(dpOut);
+                    taskMap.put(mpt.getTaskId(), mpt);
+                }
+            }
+
+        }
+
+        String[] startTaskIds = {downloadTask.getTaskId()};
+        String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
+
+        logger.info("Submitted workflow {} to parse resource {}", workflowId, workflowMessage.getSourceResourceId());
+    }
+}
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index c51d0ef..29be8a4 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -40,11 +40,16 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
     @Autowired
     private DataSyncWorkflowManager dataSyncWorkflowManager;
 
+    @Autowired
+    private DataParsingWorkflowManager dataParsingWorkflowManager;
+
     @Override
     public void invokeWorkflow(WorkflowInvocationRequest request,
                                StreamObserver<WorkflowInvocationResponse> responseObserver) {
         try {
-            dataSyncWorkflowManager.submitDataSyncWorkflow(request);
+            logger.info("Invoking workflow executor for resource {}", request.getMessage().getSourceResourceId());
+            //dataSyncWorkflowManager.submitDataSyncWorkflow(request);
+            dataParsingWorkflowManager.submitDataParsingWorkflow(request);
             responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
             responseObserver.onCompleted();
         } catch (Exception ex) {
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/APIRunner.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowManagerRunner.java
similarity index 88%
rename from data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/APIRunner.java
rename to data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowManagerRunner.java
index b6af806..eacac7c 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/APIRunner.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowManagerRunner.java
@@ -20,12 +20,10 @@ package org.apache.airavata.datalake.workflow.engine.wm.datasync;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.WebApplicationType;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
 import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
-import org.springframework.context.annotation.ComponentScan;
 
 @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
         DataSourceTransactionManagerAutoConfiguration.class,
@@ -33,9 +31,9 @@ import org.springframework.context.annotation.ComponentScan;
 /*@EnableJpaAuditing
 @EnableJpaRepositories("org.apache.airavata.datalake")
 @EntityScan("org.apache.airavata.datalake")*/
-public class APIRunner implements CommandLineRunner {
+public class WorkflowManagerRunner implements CommandLineRunner {
     public static void main(String[] args) {
-        SpringApplication app = new SpringApplication(APIRunner.class);
+        SpringApplication app = new SpringApplication(WorkflowManagerRunner.class);
         app.setWebApplicationType(WebApplicationType.NONE);
         app.run(args);
     }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
index cd2e5cc..c4619ce 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
@@ -42,9 +42,18 @@
                     <groupId>com.google.guava</groupId>
                     <artifactId>guava</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons.io.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>24.0-jre</version>
@@ -103,6 +112,27 @@
             <artifactId>workflow-engine-client</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>data-orchestrator-api-stub</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java</artifactId>
+            <version>${docker.java.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.jaxrs</groupId>
+                    <artifactId>jackson-jaxrs-json-provider</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
     </dependencies>
 
     <properties>
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index 5573a6e..fbf2f57 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -46,7 +46,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
     private BlockingQueue<TaskCallbackContext> callbackContextQueue = new LinkedBlockingQueue<>();
 
     @TaskOutPort(name = "nextTask")
-    private OutPort outPort;
+    private List<OutPort> outPorts = new ArrayList<>();
 
     @TaskParam(name = "taskId")
     private ThreadLocal<String> taskId = new ThreadLocal<>();
@@ -87,19 +87,23 @@ public abstract class AbstractTask extends UserContentStore implements Task {
 
     @Override
     public void cancel() {
-        onCancel();
+        try {
+            onCancel();
+        } catch (Exception e) {
+            logger.error("Unknown error while cancelling task {}", getTaskId(), e);
+        }
     }
 
-    public abstract TaskResult onRun();
+    public abstract TaskResult onRun() throws Exception;
 
-    public abstract void onCancel();
+    public abstract void onCancel() throws Exception;
 
-    public OutPort getOutPort() {
-        return outPort;
+    public List<OutPort> getOutPorts() {
+        return outPorts;
     }
 
-    public void setOutPort(OutPort outPort) {
-        this.outPort = outPort;
+    public void addOutPort(OutPort outPort) {
+        this.outPorts.add(outPort);
     }
 
     public int getRetryCount() {
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
index 9033f1a..cec9c02 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
@@ -29,14 +29,14 @@ public abstract class BlockingTask extends AbstractTask {
     }
 
     @Override
-    public TaskResult onRun() {
+    public TaskResult onRun() throws Exception {
         return runBlockingCode();
     }
 
-    public abstract TaskResult runBlockingCode();
+    public abstract TaskResult runBlockingCode() throws Exception;
 
     @Override
-    public void onCancel() {
+    public void onCancel() throws Exception {
 
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
index 2afecea..abf63df 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
@@ -36,7 +36,7 @@ public class NonBlockingTask extends AbstractTask {
     }
 
     @Override
-    public TaskResult onRun() {
+    public TaskResult onRun() throws Exception {
         Class<?> c = this.getClass();
         Method[] allMethods = c.getMethods();
         for (Method method : allMethods) {
@@ -60,7 +60,7 @@ public class NonBlockingTask extends AbstractTask {
     }
 
     @Override
-    public void onCancel() {
+    public void onCancel() throws Exception {
 
     }
 
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
index 4fc32ca..6133b26 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
@@ -80,15 +80,6 @@ public class TaskUtil {
                 }
             }
         }
-
-        for (Field classField : allFields) {
-            TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
-            if (outPort != null) {
-                classField.setAccessible(true);
-                OutPort op = new OutPort();
-                op.setNextTaskId(params.get(outPort.name()));
-            }
-        }
     }
 
     public static <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
@@ -111,14 +102,6 @@ public class TaskUtil {
                     logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
                     throw e;
                 }
-
-                TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
-                if (outPort != null) {
-                    classField.setAccessible(true);
-                    if (classField.get(data) != null) {
-                        result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
-                    }
-                }
             }
         }
         return result;
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
new file mode 100644
index 0000000..d21a91d
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.async.ResultCallback;
+import com.github.dockerjava.api.command.CreateContainerResponse;
+import com.github.dockerjava.api.command.LogContainerCmd;
+import com.github.dockerjava.api.command.PullImageResultCallback;
+import com.github.dockerjava.api.command.WaitContainerResultCallback;
+import com.github.dockerjava.api.model.Bind;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.api.model.StreamType;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientBuilder;
+import com.github.dockerjava.core.command.LogContainerResultCallback;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@BlockingTaskDef(name = "GenericDataParsingTask")
+public class GenericDataParsingTask extends BlockingTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(GenericDataParsingTask.class);
+
+    @TaskParam(name = "ParserID")
+    private ThreadLocal<String> parserId = new ThreadLocal<>();
+
+    @TaskParam(name = "InputMapping")
+    private ThreadLocal<StringMap> inputMapping = new ThreadLocal<>();
+
+    @Override
+    public TaskResult runBlockingCode() {
+
+        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+        DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
+        ParserFetchResponse parserFetchResponse = parserClient
+                .fetchParser(ParserFetchRequest.newBuilder()
+                        .setParserId(getParserId()).build());
+
+        DataParser parser = parserFetchResponse.getParser();
+        List<DataParserInputInterface> inputInterfaces = parser.getInputInterfacesList();
+
+        String tempWorkDir = "/tmp/" + UUID.randomUUID();
+        String tempInputDir = tempWorkDir + File.separator + "inputs";
+        String tempOutputDir = tempWorkDir + File.separator + "outputs";
+        logger.info("Using temp working directory {}", tempWorkDir);
+        try {
+            Files.createDirectory(Paths.get(tempWorkDir));
+            Files.createDirectory(Paths.get(tempInputDir));
+            Files.createDirectory(Paths.get(tempOutputDir));
+        } catch (IOException e) {
+            logger.error("Failed to create temp working directories in {}", tempWorkDir, e);
+            return new TaskResult(TaskResult.Status.FAILED, "Failed to create temp working directories");
+        }
+
+        for (DataParserInputInterface dpi: inputInterfaces) {
+            String path = getInputMapping().get(dpi.getParserInputInterfaceId());
+            if (path == null) {
+                logger.error("No value specified for input {}", dpi.getParserInputInterfaceId());
+                return new TaskResult(TaskResult.Status.FAILED, "No value specified for input");
+            }
+
+            if (path.startsWith("$")) {
+                path = getUserContent(path.substring(1), Scope.WORKFLOW);
+                if (path == null) {
+                    logger.error("No value in context to path {} for {}", path, dpi.getParserInputInterfaceId());
+                    return new TaskResult(TaskResult.Status.FAILED, "No value specified in context for path");
+                }
+            }
+
+            try {
+                Files.copy(Paths.get(path), Paths.get(tempInputDir + File.separator + dpi.getInputName()));
+                logger.info("Copied input file from path {} to {}", path, tempInputDir + File.separator + dpi.getInputName());
+            } catch (IOException e) {
+                logger.error("Failed to copy the input from path {} to {}", path, tempInputDir);
+                return new TaskResult(TaskResult.Status.FAILED, "Failed to copy the input");
+            }
+        }
+
+        try {
+            runContainer(parser, tempInputDir, tempOutputDir, new HashMap<>());
+            exportOutputs(parser, tempOutputDir);
+        } catch (Exception e) {
+            logger.error("Failed to execute the container for task {}", getTaskId());
+            return new TaskResult(TaskResult.Status.FAILED, "Failed to execute the container");
+        }
+        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    }
+
+    private void exportOutputs(DataParser parser, String outputPath) {
+        for (DataParserOutputInterface dpoi: parser.getOutputInterfacesList()) {
+            putUserContent(getTaskId() + "-" + dpoi.getOutputName(),
+                    outputPath + File.separator + dpoi.getOutputName(),
+                    Scope.WORKFLOW);
+        }
+    }
+
+    private void runContainer(DataParser parser, String inputPath, String outputPath, Map<String, String> environmentValues)
+            throws Exception{
+
+        DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
+        DockerClient dockerClient = DockerClientBuilder.getInstance(config.build()).build();
+
+        logger.info("Pulling image " + parser.getDockerImage());
+        try {
+            dockerClient.pullImageCmd(parser.getDockerImage().split(":")[0])
+                    .withTag(parser.getDockerImage().split(":")[1])
+                    .exec(new PullImageResultCallback()).awaitCompletion();
+        } catch (InterruptedException e) {
+            logger.error("Interrupted while pulling image", e);
+            throw e;
+        }
+
+        logger.info("Successfully pulled image " + parser.getDockerImage());
+
+        String containerId = UUID.randomUUID().toString();
+        String commands[] = parser.getExecCommand().split(" ");
+        CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getDockerImage()).withCmd(commands).withName(containerId)
+                .withBinds(Bind.parse(inputPath + ":" + parser.getInputPath()),
+                        Bind.parse(outputPath + ":" + parser.getOutputPath()))
+                .withTty(true)
+                .withAttachStdin(true)
+                .withAttachStdout(true).withEnv(environmentValues.entrySet()
+                        .stream()
+                        .map(entry -> entry.getKey() + "=" + entry.getValue())
+                        .collect(Collectors.toList()))
+                .exec();
+
+        logger.info("Created the container with id " + containerResponse.getId());
+
+        final StringBuilder dockerLogs = new StringBuilder();
+
+        if (containerResponse.getWarnings() != null && containerResponse.getWarnings().length > 0) {
+            StringBuilder warningStr = new StringBuilder();
+            for (String w : containerResponse.getWarnings()) {
+                warningStr.append(w).append(",");
+            }
+            logger.warn("Container " + containerResponse.getId() + " warnings : " + warningStr);
+        } else {
+            logger.info("Starting container with id " + containerResponse.getId());
+            dockerClient.startContainerCmd(containerResponse.getId()).exec();
+            LogContainerCmd logContainerCmd = dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
+
+            try {
+
+                logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
+                    @Override
+                    public void onNext(Frame item) {
+                        logger.info("Got frame: {}", item);;
+                        if (item.getStreamType() == StreamType.STDOUT) {
+                            dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
+                            dockerLogs.append("\n");
+                        } else if (item.getStreamType() == StreamType.STDERR) {
+                            dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
+                            dockerLogs.append("\n");
+                        }
+                        super.onNext(item);
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        logger.error("Errored while running the container {}", containerId, throwable);
+                        super.onError(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        logger.info("Container {} successfully completed", containerId);
+                        super.onComplete();
+                    }
+                }).awaitCompletion();
+            } catch (InterruptedException e) {
+                logger.error("Interrupted while reading container log" + e.getMessage());
+                throw e;
+            }
+
+            logger.info("Waiting for the container to stop");
+
+            Integer statusCode = dockerClient.waitContainerCmd(containerResponse.getId()).exec(new WaitContainerResultCallback()).awaitStatusCode();
+            logger.info("Container " + containerResponse.getId() + " exited with status code " + statusCode);
+            if (statusCode != 0) {
+                logger.error("Failing as non zero status code was returned");
+                throw new Exception("Failing as non zero status code was returned");
+            }
+
+            logger.info("Container logs " + dockerLogs.toString());
+        }
+    }
+
+    public String getParserId() {
+        return parserId.get();
+    }
+
+    public void setParserId(String parserId) {
+        this.parserId.set(parserId);
+    }
+
+    public StringMap getInputMapping() {
+        return inputMapping.get();
+    }
+
+    public void setInputMapping(StringMap inputMapping) {
+        this.inputMapping.set(inputMapping);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
new file mode 100644
index 0000000..7bb9489
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import com.google.protobuf.Struct;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.storage.AddResourceMetadataRequest;
+import org.apache.airavata.datalake.drms.storage.ResourceServiceGrpc;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Base64;
+
+@BlockingTaskDef(name = "MetadataPersistTask")
+public class MetadataPersistTask extends BlockingTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(MetadataPersistTask.class);
+
+    @TaskParam(name = "JSON_FILE")
+    private final ThreadLocal<String> jsonFile = new ThreadLocal<>();
+
+    @TaskParam(name = "DRMS_HOST")
+    private final ThreadLocal<String> drmsHost = new ThreadLocal<>();
+
+    @TaskParam(name = "DRMS_PORT")
+    private final ThreadLocal<Integer> drmsPort = new ThreadLocal<>();
+
+    @TaskParam(name = "RESOURCE_ID")
+    private final ThreadLocal<String> resourceId = new ThreadLocal<>();
+
+    @TaskParam(name = "DRMS_SERVICE_ACCOUNT_KEY")
+    private final ThreadLocal<String> serviceAccountKey = new ThreadLocal<>();
+
+    @TaskParam(name = "DRMS_SERVICE_ACCOUNT_SECRET")
+    private final ThreadLocal<String> serviceAccountSecret = new ThreadLocal<>();
+
+    @TaskParam(name = "USER")
+    private final ThreadLocal<String> user = new ThreadLocal<>();
+
+    @TaskParam(name = "TENANT")
+    private final ThreadLocal<String> tenant = new ThreadLocal<>();
+
+    @Override
+    public TaskResult runBlockingCode() throws Exception {
+
+        ManagedChannel channel = ManagedChannelBuilder
+                .forAddress(getDrmsHost(), getDrmsPort()).usePlaintext().build();
+
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(Base64.getEncoder()
+                        .encodeToString((getServiceAccountKey() + ":" + getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)))
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(getUser())
+                        .setTenantId(getTenant())
+                        .build())
+                .build();
+
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceGrpc.newBlockingStub(channel);
+
+        String derivedFilePath = getJsonFile();
+        if (derivedFilePath.startsWith("$")) {
+            derivedFilePath = getUserContent(derivedFilePath.substring(1), Scope.WORKFLOW);
+        }
+
+        logger.info("Using json file {}", derivedFilePath);
+
+        String jsonString = new String(Files.readAllBytes(Path.of(derivedFilePath)));
+        //logger.info("Json Content {}", jsonString);
+
+        Struct.Builder structBuilder = Struct.newBuilder();
+        JsonFormat.parser().merge(jsonString, structBuilder);
+
+        logger.info("Adding metadata to resource {}", getResourceId());
+        resourceClient.addResourceMetadata(AddResourceMetadataRequest.newBuilder()
+                .setResourceId(getResourceId())
+                .setAuthToken(serviceAuthToken)
+                .setType("FILE")
+                .setMetadata(structBuilder.build()).build());
+        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+    }
+
+    public String getJsonFile() {
+        return jsonFile.get();
+    }
+
+    public void setJsonFile(String jsonFile) {
+        this.jsonFile.set(jsonFile);
+    }
+
+    public String getDrmsHost() {
+        return drmsHost.get();
+    }
+
+    public void setDrmsHost(String drmsHost) {
+        this.drmsHost.set(drmsHost);
+    }
+
+    public int getDrmsPort() {
+        return drmsPort.get();
+    }
+
+    public void setDrmsPort(int drmsPort) {
+        this.drmsPort.set(drmsPort);
+    }
+
+    public String getResourceId() {
+        return resourceId.get();
+    }
+
+    public void setResourceId(String resourceId) {
+        this.resourceId.set(resourceId);
+    }
+
+    public String getServiceAccountKey() {
+        return serviceAccountKey.get();
+    }
+
+    public void setServiceAccountKey(String serviceAccountKey) {
+        this.serviceAccountKey.set(serviceAccountKey);
+    }
+
+    public String getServiceAccountSecret() {
+        return serviceAccountSecret.get();
+    }
+
+    public void setServiceAccountSecret(String serviceAccountSecret) {
+        this.serviceAccountSecret.set(serviceAccountSecret);
+    }
+
+    public String getUser() {
+        return user.get();
+    }
+
+    public void setUser(String user) {
+        this.user.set(user);
+    }
+
+    public String getTenant() {
+        return tenant.get();
+    }
+
+    public void setTenant(String tenant) {
+        this.tenant.set(tenant);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
new file mode 100644
index 0000000..eff1e69
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.*;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+
+@BlockingTaskDef(name = "SyncLocalDataDownloadTask")
+public class SyncLocalDataDownloadTask extends BlockingTask {
+
+    private final static Logger logger = LoggerFactory.getLogger(SyncLocalDataDownloadTask.class);
+
+    @TaskParam(name = "MFTAPIHost")
+    private final ThreadLocal<String> mftHost = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTAPIPort")
+    private final ThreadLocal<Integer> mftPort = new ThreadLocal<>();
+
+    // Security
+    @TaskParam(name = "UserId")
+    private final ThreadLocal<String> userId = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTClientId")
+    private final ThreadLocal<String> mftClientId = new ThreadLocal<>();
+
+    @TaskParam(name = "MFTClientSecret")
+    private final ThreadLocal<String> mftClientSecret = new ThreadLocal<>();
+
+    @TaskParam(name = "TenantId")
+    private final ThreadLocal<String> tenantId = new ThreadLocal<>();
+
+    @TaskParam(name = "SourceResourceId")
+    private final ThreadLocal<String> sourceResourceId = new ThreadLocal<>();
+
+    @TaskParam(name = "SourceCredToken")
+    private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();
+
+    public static void main(String args[]) {
+
+
+        SyncLocalDataDownloadTask task = new SyncLocalDataDownloadTask();
+        task.setMftClientId("mft-agent");
+        task.setMftClientSecret("kHqH27BloDCbLvwUA8ZYRlHcJxXZyby9PB90bTdU");
+        task.setUserId("isjarana");
+        task.setTenantId("custos-ii8g0cfwsz6ruwezykn9-10002640");
+        task.setMftHost("149.165.157.235");
+        task.setMftPort(7004);
+        task.setSourceResourceId("46c7659360f8bdb1473fc16572d98a8611cfd7bb2296660dcd917ed4b6d33c13");
+        task.setSourceCredToken("ed1af924-2a91-412c-bedf-d9321252456d");
+
+        task.runBlockingCode();
+    }
+
+    @Override
+    public TaskResult runBlockingCode() {
+
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(getMftHost(), getMftPort());
+
+        DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+                .setUserId(getUserId())
+                .setClientId(getMftClientId())
+                .setClientSecret(getMftClientSecret())
+                .putProperties("TENANT_ID", getTenantId()).build();
+
+        HttpDownloadApiResponse httpDownloadApiResponse = mftClient.submitHttpDownload(HttpDownloadApiRequest
+                .newBuilder()
+                .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build())
+                .setSourceResourceId(getSourceResourceId())
+                .setSourceToken(getSourceCredToken())
+                .setSourceType("SCP")
+                .setSourceResourceChildPath("")
+                .build());
+
+        FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+                .setResourceType("SCP")
+                .setResourceId(getSourceResourceId())
+                .setResourceToken(getSourceCredToken())
+                .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+        String downloadUrl = httpDownloadApiResponse.getUrl();
+        logger.info("Using download URL {}", downloadUrl);
+
+        String downloadPath = "/tmp/" + metadata.getFriendlyName();
+        try (BufferedInputStream in = new BufferedInputStream(new URL(downloadUrl).openStream());
+             FileOutputStream fileOutputStream = new FileOutputStream(downloadPath)) {
+                byte dataBuffer[] = new byte[1024];
+                int bytesRead;
+                while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
+                    fileOutputStream.write(dataBuffer, 0, bytesRead);
+                }
+        } catch (IOException e) {
+            logger.error("Failed to download file", e);
+            return new TaskResult(TaskResult.Status.FAILED, "Failed to download file");
+        }
+
+        logger.info("Downloaded to path {}", downloadPath);
+
+        putUserContent("DOWNLOAD_PATH", downloadPath, Scope.WORKFLOW);
+        return new TaskResult(TaskResult.Status.COMPLETED, "Success");
+    }
+
+    public String getSourceResourceId() {
+        return sourceResourceId.get();
+    }
+
+    public void setSourceResourceId(String sourceResourceId) {
+        this.sourceResourceId.set(sourceResourceId);
+    }
+
+    public String getSourceCredToken() {
+        return sourceCredToken.get();
+    }
+
+    public void setSourceCredToken(String sourceCredToken) {
+        this.sourceCredToken.set(sourceCredToken);
+    }
+
+    public String getUserId() {
+        return userId.get();
+    }
+
+    public void setUserId(String userId) {
+        this.userId.set(userId);
+    }
+
+    public String getMftHost() {
+        return mftHost.get();
+    }
+
+    public void setMftHost(String mftHost) {
+        this.mftHost.set(mftHost);
+    }
+
+    public Integer getMftPort() {
+        return mftPort.get();
+    }
+
+    public void setMftPort(Integer mftPort) {
+        this.mftPort.set(mftPort);
+    }
+
+    public String getMftClientId() {
+        return mftClientId.get();
+    }
+
+    public void setMftClientId(String mftClientId) {
+        this.mftClientId.set(mftClientId);
+    }
+
+    public String getMftClientSecret() {
+        return mftClientSecret.get();
+    }
+
+    public void setMftClientSecret(String mftClientSecret) {
+        this.mftClientSecret.set(mftClientSecret);
+    }
+
+    public String getTenantId() {
+        return tenantId.get();
+    }
+
+    public void setTenantId(String tenantId) {
+        this.tenantId.set(tenantId);
+    }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/types/StringMap.java
similarity index 66%
copy from data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/types/StringMap.java
index 9033f1a..4f210a7 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/types/StringMap.java
@@ -15,28 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.types;
 
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
 
-public abstract class BlockingTask extends AbstractTask {
+import java.util.HashMap;
 
-    private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
-    public BlockingTask() {
-    }
+public class StringMap extends HashMap<String, String> implements TaskParamType {
 
     @Override
-    public TaskResult onRun() {
-        return runBlockingCode();
+    public String serialize() {
+        return new Gson().toJson(this);
     }
 
-    public abstract TaskResult runBlockingCode();
-
     @Override
-    public void onCancel() {
-
+    public void deserialize(String content) {
+        StringMap stringMap = new Gson().fromJson(content, StringMap.class);
+        this.putAll(stringMap);
     }
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java
index 1b9e761..4415dd2 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java
@@ -222,8 +222,8 @@ public class WorkflowOperator {
                 TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
                 if (outPortAnnotation != null) {
                     field.setAccessible(true);
-                    OutPort outPort = (OutPort) field.get(taskObj);
-                    outPorts.add(outPort);
+                    List<OutPort> outPort = (List<OutPort>) field.get(taskObj);
+                    outPorts.addAll(outPort);
                 }
             }
         }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml
index 7c4394a..1c167e0 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml
+++ b/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml
@@ -1,6 +1,9 @@
 tasks:
   blocking:
     - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.SyncLocalDataDownloadTask
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.GenericDataParsingTask
+    - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.MetadataPersistTask
   nonBlocking:
     - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
     - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask
diff --git a/data-resource-management-service/drms-api/pom.xml b/data-resource-management-service/drms-api/pom.xml
index bd63411..be1b2d3 100644
--- a/data-resource-management-service/drms-api/pom.xml
+++ b/data-resource-management-service/drms-api/pom.xml
@@ -53,6 +53,12 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter</artifactId>
             <version>${spring.boot.data.jpa}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-to-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>net.sf.dozer</groupId>
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 625ba8b..9c7e445 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -761,7 +761,8 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
             responseObserver.onCompleted();
         } catch (Exception ex) {
             String msg = " Error occurred while adding resource metadata " + ex.getMessage();
-            logger.error(" Error occurred while adding resource metadata: Messages {} ", ex.getMessage(), ex);
+            // Issue https://github.com/neo4j/neo4j-java-driver/issues/773
+            logger.error("Error occurred while adding resource metadata: Messages {}", ex.getMessage());
             responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
         }
     }
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 57858f9..7692b2c 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -215,6 +215,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
     @Override
     public void searchStoragePreference(StoragePreferenceSearchRequest request, StreamObserver<StoragePreferenceSearchResponse> responseObserver) {
         try {
+
+            System.out.println(request);
             AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
 
             Map<String, Object> userProps = new HashMap<>();
diff --git a/pom.xml b/pom.xml
index 2c4f041..5dcb453 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,7 +135,7 @@
 
         <maven.surefile.plugin.version>3.0.0-M4</maven.surefile.plugin.version>
 
-        <log.back.version>1.2.3</log.back.version>
+        <log.back.version>1.2.5</log.back.version>
 
         <com.jcraft.version>0.1.55</com.jcraft.version>
 
@@ -143,8 +143,8 @@
 
         <com.codahale.version>0.7.0</com.codahale.version>
 
-        <neo4j.ogm.version>3.2.20</neo4j.ogm.version>
-        <neo4j.version>3.4.6</neo4j.version>
+        <neo4j.ogm.version>3.2.25</neo4j.ogm.version>
+        <neo4j.version>4.3.2</neo4j.version>
         <io.grpc.version>1.25.0</io.grpc.version>
         <spring-security.version>5.3.4.RELEASE</spring-security.version>
         <kafka-clients.version>1.0.0</kafka-clients.version>
@@ -153,6 +153,9 @@
         <yaml.version>1.15</yaml.version>
         <spring.boot.version>2.4.5</spring.boot.version>
         <commons.beanutils.version>1.9.4</commons.beanutils.version>
+        <docker.java.version>3.2.11</docker.java.version>
+        <jackson.version>2.11.4</jackson.version>
+        <commons.io.version>2.6</commons.io.version>
     </properties>
 
 </project>