You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/08/07 21:12:40 UTC
[airavata-data-lake] branch master updated: Integrating data
parsing workflow
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new cdbea12 Integrating data parsing workflow
cdbea12 is described below
commit cdbea1266872de8c83d0e592750b1a2fc77e2675
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Aug 7 17:12:26 2021 -0400
Integrating data parsing workflow
---
ansible/roles/mft/tasks/main.yml | 1 +
.../mft/templates/secret-service/secrets.json.j2 | 2 +-
.../src/main/dist/bin/orch-api-server-daemon.sh | 4 +-
.../src/main/dist/bin/orch-api-server.sh | 2 +-
...ializer.java => DataOrchestratorAPIRunner.java} | 6 +-
.../handlers/grpc/DataParserApiHandler.java | 48 +++--
.../src/main/proto/parsing.proto | 12 ++
.../workflow/engine/wm/datasync/AppConfig.java | 5 +
.../wm/datasync/DataParsingWorkflowManager.java | 202 +++++++++++++++++
.../wm/datasync/WorkflowEngineAPIHandler.java | 7 +-
.../{APIRunner.java => WorkflowManagerRunner.java} | 6 +-
.../workflow-engine/workflow-engine-core/pom.xml | 30 +++
.../workflow/engine/task/AbstractTask.java | 20 +-
.../workflow/engine/task/BlockingTask.java | 6 +-
.../workflow/engine/task/NonBlockingTask.java | 4 +-
.../workflow/engine/task/TaskUtil.java | 17 --
.../engine/task/impl/GenericDataParsingTask.java | 240 +++++++++++++++++++++
.../engine/task/impl/MetadataPersistTask.java | 173 +++++++++++++++
.../task/impl/SyncLocalDataDownloadTask.java | 193 +++++++++++++++++
.../{BlockingTask.java => types/StringMap.java} | 25 +--
.../workflow/engine/wm/WorkflowOperator.java | 4 +-
.../src/main/resources/task-list.yaml | 3 +
data-resource-management-service/drms-api/pom.xml | 6 +
.../drms/api/handlers/ResourceServiceHandler.java | 3 +-
.../handlers/StoragePreferenceServiceHandler.java | 2 +
pom.xml | 9 +-
26 files changed, 953 insertions(+), 77 deletions(-)
diff --git a/ansible/roles/mft/tasks/main.yml b/ansible/roles/mft/tasks/main.yml
index f47ceb0..dc6e243 100644
--- a/ansible/roles/mft/tasks/main.yml
+++ b/ansible/roles/mft/tasks/main.yml
@@ -18,6 +18,7 @@
firewalld: port="{{ item }}/tcp"
zone=public permanent=true state=enabled immediate=yes
with_items:
+ - "{{ mft_api_service_grpc_port }}"
- "{{ mft_default_agent_port }}"
- "{{ mft_consul_port }}"
- "{{ mft_resource_service_grpc_port }}"
diff --git a/ansible/roles/mft/templates/secret-service/secrets.json.j2 b/ansible/roles/mft/templates/secret-service/secrets.json.j2
index 30dcd0e..bfa8b53 100644
--- a/ansible/roles/mft/templates/secret-service/secrets.json.j2
+++ b/ansible/roles/mft/templates/secret-service/secrets.json.j2
@@ -1,7 +1,7 @@
[
{
"type": "SCP",
- "secretId": "ssh_storage_preference_emc_source",
+ "secretId": "ed1af924-2a91-412c-bedf-d9321252456d",
"user": "ubuntu",
"privateKey": "{{ vault_mft_agent_default_ssh_private_key }}",
"publicKey": "{{ vault_mft_agent_default_ssh_public_key }}",
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh
index 81d17fe..70a4333 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server-daemon.sh
@@ -41,7 +41,7 @@ case $1 in
echo "Starting $SERVICE_NAME ..."
if [ ! -f $PID_PATH_NAME ]; then
nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
- org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+ org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
echo $! > $PID_PATH_NAME
echo "$SERVICE_NAME started ..."
else
@@ -90,7 +90,7 @@ case $1 in
rm $PID_PATH_NAME
echo "$SERVICE_NAME starting ..."
nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
- org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+ org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
echo $! > $PID_PATH_NAME
echo "$SERVICE_NAME started ..."
else
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh
index d146413..b1d0221 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/dist/bin/orch-api-server.sh
@@ -67,5 +67,5 @@ do
done
java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
- org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $*
+ org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $*
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/APIServerInitializer.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
similarity index 94%
rename from data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/APIServerInitializer.java
rename to data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
index e687e1f..7fcefc3 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/APIServerInitializer.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
@@ -43,8 +43,8 @@ import java.io.InputStream;
@EnableJpaAuditing
@EnableJpaRepositories("org.apache.airavata.datalake")
@EntityScan("org.apache.airavata.datalake")
-public class APIServerInitializer implements CommandLineRunner {
- private static final Logger LOGGER = LoggerFactory.getLogger(APIServerInitializer.class);
+public class DataOrchestratorAPIRunner implements CommandLineRunner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataOrchestratorAPIRunner.class);
@Autowired
private OrchestratorEventHandler orchestratorEventHandler;
@@ -53,7 +53,7 @@ public class APIServerInitializer implements CommandLineRunner {
private String configPath;
public static void main(String[] args) {
- SpringApplication.run(APIServerInitializer.class, args);
+ SpringApplication.run(DataOrchestratorAPIRunner.class, args);
}
@Override
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java
index 425cc96..30d2dd1 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/DataParserApiHandler.java
@@ -28,6 +28,7 @@ import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
+import java.util.Optional;
@GRpcService
public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServiceImplBase {
@@ -50,26 +51,13 @@ public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServic
@Override
public void listParsers(ParserListRequest request, StreamObserver<ParserListResponse> responseObserver) {
- DozerBeanMapper mapper = new DozerBeanMapper();
ParserListResponse.Builder response = ParserListResponse.newBuilder();
List<DataParserEntity> allParsers = parserRepo.findAll();
allParsers.forEach(dataParserEntity -> {
- DataParser.Builder parserBuilder = DataParser.newBuilder();
- mapper.map(dataParserEntity, parserBuilder);
- dataParserEntity.getInputInterfacesList().forEach(dataParserInputInterfaceEntity -> {
- DataParserInputInterface.Builder inputBuilder = DataParserInputInterface.newBuilder();
- mapper.map(dataParserInputInterfaceEntity, inputBuilder);
- parserBuilder.addInputInterfaces(inputBuilder);
- });
- dataParserEntity.getOutputInterfacesList().forEach(dataParserOutputInterfaceEntity -> {
- DataParserOutputInterface.Builder outputBuilder = DataParserOutputInterface.newBuilder();
- mapper.map(dataParserOutputInterfaceEntity, outputBuilder);
- parserBuilder.addOutputInterfaces(outputBuilder);
- });
- response.addParsers(parserBuilder);
+ response.addParsers(mapParser(dataParserEntity));
});
responseObserver.onNext(response.build());
@@ -77,6 +65,38 @@ public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServic
}
@Override
+ public void fetchParser(ParserFetchRequest request, StreamObserver<ParserFetchResponse> responseObserver) {
+ Optional<DataParserEntity> entityOp = this.parserRepo.findById(request.getParserId());
+ if (entityOp.isPresent()) {
+ responseObserver.onNext(ParserFetchResponse.newBuilder().setParser(mapParser(entityOp.get())).build());
+ responseObserver.onCompleted();
+
+ } else {
+ responseObserver.onError(new Exception("Couldn't find a parser with id " + request.getParserId()));
+ }
+ }
+
+ private DataParser.Builder mapParser(DataParserEntity dataParserEntity) {
+ DozerBeanMapper mapper = new DozerBeanMapper();
+
+ DataParser.Builder parserBuilder = DataParser.newBuilder();
+ mapper.map(dataParserEntity, parserBuilder);
+ dataParserEntity.getInputInterfacesList().forEach(dataParserInputInterfaceEntity -> {
+ DataParserInputInterface.Builder inputBuilder = DataParserInputInterface.newBuilder();
+ mapper.map(dataParserInputInterfaceEntity, inputBuilder);
+ parserBuilder.addInputInterfaces(inputBuilder);
+ });
+
+ dataParserEntity.getOutputInterfacesList().forEach(dataParserOutputInterfaceEntity -> {
+ DataParserOutputInterface.Builder outputBuilder = DataParserOutputInterface.newBuilder();
+ mapper.map(dataParserOutputInterfaceEntity, outputBuilder);
+ parserBuilder.addOutputInterfaces(outputBuilder);
+ });
+
+ return parserBuilder;
+ }
+
+ @Override
public void registerParsingJob(ParsingJobRegisterRequest request, StreamObserver<ParsingJobRegisterResponse> responseObserver) {
DozerBeanMapper mapper = new DozerBeanMapper();
DataParsingJobEntity entity = mapper.map(request.getParsingJob(), DataParsingJobEntity.class);
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
index 635e261..df6edf7 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
@@ -77,6 +77,14 @@ message ParserRegisterResponse {
string parserId = 1;
}
+message ParserFetchRequest {
+ string parserId = 1;
+}
+
+message ParserFetchResponse {
+ DataParser parser = 1;
+}
+
message ParserListRequest {
}
@@ -103,6 +111,10 @@ message ParsingJobListResponse {
service DataParserService {
+
+ rpc fetchParser (ParserFetchRequest) returns (ParserFetchResponse) {
+ }
+
rpc registerParser (ParserRegisterRequest) returns (ParserRegisterResponse) {
}
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java
index b91922a..f7c976f 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/AppConfig.java
@@ -33,6 +33,11 @@ public class AppConfig {
return new DataSyncWorkflowManager();
}
+ @Bean(initMethod = "init")
+ public DataParsingWorkflowManager dataParsingWorkflowManager() {
+ return new DataParsingWorkflowManager();
+ }
+
@Bean
public CallbackWorkflowStore callbackWorkflowStore() {
return new CallbackWorkflowStore();
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
new file mode 100644
index 0000000..75eff4c
--- /dev/null
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.workflow.engine.wm.datasync;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.GenericDataParsingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.MetadataPersistTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.SyncLocalDataDownloadTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.wm.WorkflowOperator;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
+import org.apache.airavata.mft.api.service.FileMetadataResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.script.*;
+import java.util.*;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class DataParsingWorkflowManager {
+ private final static Logger logger = LoggerFactory.getLogger(DataParsingWorkflowManager.class);
+
+ @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+ private String clusterName;
+
+ @org.springframework.beans.factory.annotation.Value("${parsing.wm.name}")
+ private String workflowManagerName;
+
+ @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+ private String zkAddress;
+
+ @org.springframework.beans.factory.annotation.Value("${mft.host}")
+ private String mftHost;
+
+ @org.springframework.beans.factory.annotation.Value("${mft.port}")
+ private int mftPort;
+
+ @org.springframework.beans.factory.annotation.Value("${drms.host}")
+ private String drmsHost;
+
+ @org.springframework.beans.factory.annotation.Value("${drms.port}")
+ private int drmsPort;
+
+ private String mftClientId = "mft-agent";
+
+ private String mftClientSecret = "kHqH27BloDCbLvwUA8ZYRlHcJxXZyby9PB90bTdU";
+
+
+ @Autowired
+ private CallbackWorkflowStore callbackWorkflowStore;
+
+ private WorkflowOperator workflowOperator;
+
+ public void init() throws Exception {
+ workflowOperator = new WorkflowOperator();
+ workflowOperator.init(clusterName, workflowManagerName, zkAddress, callbackWorkflowStore);
+ logger.info("Successfully initialized Data Parsing Workflow Manager");
+ }
+
+ public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {
+
+ WorkflowMessage workflowMessage = request.getMessage();
+ logger.info("Processing parsing workflow for resource {}", workflowMessage.getSourceResourceId());
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);
+
+ DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+ .setUserId(workflowMessage.getUsername())
+ .setClientId(mftClientId)
+ .setClientSecret(mftClientSecret)
+ .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
+
+ FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+ .setResourceType("SCP")
+ .setResourceId(workflowMessage.getSourceResourceId())
+ .setResourceToken(workflowMessage.getSourceCredentialToken())
+ .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+ DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
+
+ ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
+
+ Map<String, StringMap> parserInputMappings = new HashMap<>();
+ List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
+ List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
+
+ boolean match = true;
+ StringMap stringMap = new StringMap();
+ for (DataParsingJobInput pji : pjis) {
+
+ ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
+ Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
+ bindings.put("polyglot.js.allowHostAccess", true);
+ bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
+ bindings.put("metadata", metadata);
+ try {
+ Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
+ stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
+ match = match && eval;
+ } catch (ScriptException e) {
+ logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
+ match = false;
+ }
+ }
+
+ if (match) {
+ parserInputMappings.put(pj.getParserId(), stringMap);
+ }
+ return match;
+ }).collect(Collectors.toList());
+
+ Map<String, AbstractTask> taskMap = new HashMap<>();
+
+ SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
+ downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
+ downloadTask.setMftClientId(mftClientId);
+ downloadTask.setMftClientSecret(mftClientSecret);
+ downloadTask.setUserId(workflowMessage.getUsername());
+ downloadTask.setTenantId(workflowMessage.getTenantId());
+ downloadTask.setMftHost(mftHost);
+ downloadTask.setMftPort(mftPort);
+ downloadTask.setSourceResourceId(workflowMessage.getSourceResourceId());
+ downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+
+ taskMap.put(downloadTask.getTaskId(), downloadTask);
+
+ for(String parserId: parserInputMappings.keySet()) {
+
+ GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
+ dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
+ dataParsingTask.setParserId(parserId);
+ dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
+ taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
+
+ OutPort outPort = new OutPort();
+ outPort.setNextTaskId(dataParsingTask.getTaskId());
+ downloadTask.addOutPort(outPort);
+
+ DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
+ ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
+
+ for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
+
+ Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
+ o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
+ .findFirst();
+
+ if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
+ MetadataPersistTask mpt = new MetadataPersistTask();
+ mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
+ mpt.setDrmsHost(drmsHost);
+ mpt.setDrmsPort(drmsPort);
+ mpt.setTenant(workflowMessage.getTenantId());
+ mpt.setUser(workflowMessage.getUsername());
+ mpt.setServiceAccountKey(mftClientId);
+ mpt.setServiceAccountSecret(mftClientSecret);
+ mpt.setResourceId(workflowMessage.getSourceResourceId());
+ mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
+ OutPort dpOut = new OutPort();
+ dpOut.setNextTaskId(mpt.getTaskId());
+ dataParsingTask.addOutPort(dpOut);
+ taskMap.put(mpt.getTaskId(), mpt);
+ }
+ }
+
+ }
+
+ String[] startTaskIds = {downloadTask.getTaskId()};
+ String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
+
+ logger.info("Submitted workflow {} to parse resource {}", workflowId, workflowMessage.getSourceResourceId());
+ }
+}
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index c51d0ef..29be8a4 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -40,11 +40,16 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
@Autowired
private DataSyncWorkflowManager dataSyncWorkflowManager;
+ @Autowired
+ private DataParsingWorkflowManager dataParsingWorkflowManager;
+
@Override
public void invokeWorkflow(WorkflowInvocationRequest request,
StreamObserver<WorkflowInvocationResponse> responseObserver) {
try {
- dataSyncWorkflowManager.submitDataSyncWorkflow(request);
+ logger.info("Invoking workflow executor for resource {}", request.getMessage().getSourceResourceId());
+ //dataSyncWorkflowManager.submitDataSyncWorkflow(request);
+ dataParsingWorkflowManager.submitDataParsingWorkflow(request);
responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
responseObserver.onCompleted();
} catch (Exception ex) {
diff --git a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/APIRunner.java b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowManagerRunner.java
similarity index 88%
rename from data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/APIRunner.java
rename to data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowManagerRunner.java
index b6af806..eacac7c 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/APIRunner.java
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowManagerRunner.java
@@ -20,12 +20,10 @@ package org.apache.airavata.datalake.workflow.engine.wm.datasync;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
-import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
@@ -33,9 +31,9 @@ import org.springframework.context.annotation.ComponentScan;
/*@EnableJpaAuditing
@EnableJpaRepositories("org.apache.airavata.datalake")
@EntityScan("org.apache.airavata.datalake")*/
-public class APIRunner implements CommandLineRunner {
+public class WorkflowManagerRunner implements CommandLineRunner {
public static void main(String[] args) {
- SpringApplication app = new SpringApplication(APIRunner.class);
+ SpringApplication app = new SpringApplication(WorkflowManagerRunner.class);
app.setWebApplicationType(WebApplicationType.NONE);
app.run(args);
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
index cd2e5cc..c4619ce 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
@@ -42,9 +42,18 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons.io.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
@@ -103,6 +112,27 @@
<artifactId>workflow-engine-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>data-orchestrator-api-stub</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.docker-java</groupId>
+ <artifactId>docker-java</artifactId>
+ <version>${docker.java.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
<properties>
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
index 5573a6e..fbf2f57 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
@@ -46,7 +46,7 @@ public abstract class AbstractTask extends UserContentStore implements Task {
private BlockingQueue<TaskCallbackContext> callbackContextQueue = new LinkedBlockingQueue<>();
@TaskOutPort(name = "nextTask")
- private OutPort outPort;
+ private List<OutPort> outPorts = new ArrayList<>();
@TaskParam(name = "taskId")
private ThreadLocal<String> taskId = new ThreadLocal<>();
@@ -87,19 +87,23 @@ public abstract class AbstractTask extends UserContentStore implements Task {
@Override
public void cancel() {
- onCancel();
+ try {
+ onCancel();
+ } catch (Exception e) {
+ logger.error("Unknown error while cancelling task {}", getTaskId(), e);
+ }
}
- public abstract TaskResult onRun();
+ public abstract TaskResult onRun() throws Exception;
- public abstract void onCancel();
+ public abstract void onCancel() throws Exception;
- public OutPort getOutPort() {
- return outPort;
+ public List<OutPort> getOutPorts() {
+ return outPorts;
}
- public void setOutPort(OutPort outPort) {
- this.outPort = outPort;
+ public void addOutPort(OutPort outPort) {
+ this.outPorts.add(outPort);
}
public int getRetryCount() {
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
index 9033f1a..cec9c02 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
@@ -29,14 +29,14 @@ public abstract class BlockingTask extends AbstractTask {
}
@Override
- public TaskResult onRun() {
+ public TaskResult onRun() throws Exception {
return runBlockingCode();
}
- public abstract TaskResult runBlockingCode();
+ public abstract TaskResult runBlockingCode() throws Exception;
@Override
- public void onCancel() {
+ public void onCancel() throws Exception {
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
index 2afecea..abf63df 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
@@ -36,7 +36,7 @@ public class NonBlockingTask extends AbstractTask {
}
@Override
- public TaskResult onRun() {
+ public TaskResult onRun() throws Exception {
Class<?> c = this.getClass();
Method[] allMethods = c.getMethods();
for (Method method : allMethods) {
@@ -60,7 +60,7 @@ public class NonBlockingTask extends AbstractTask {
}
@Override
- public void onCancel() {
+ public void onCancel() throws Exception {
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
index 4fc32ca..6133b26 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskUtil.java
@@ -80,15 +80,6 @@ public class TaskUtil {
}
}
}
-
- for (Field classField : allFields) {
- TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
- if (outPort != null) {
- classField.setAccessible(true);
- OutPort op = new OutPort();
- op.setNextTaskId(params.get(outPort.name()));
- }
- }
}
public static <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
@@ -111,14 +102,6 @@ public class TaskUtil {
logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName());
throw e;
}
-
- TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class);
- if (outPort != null) {
- classField.setAccessible(true);
- if (classField.get(data) != null) {
- result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString());
- }
- }
}
}
return result;
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
new file mode 100644
index 0000000..d21a91d
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.async.ResultCallback;
+import com.github.dockerjava.api.command.CreateContainerResponse;
+import com.github.dockerjava.api.command.LogContainerCmd;
+import com.github.dockerjava.api.command.PullImageResultCallback;
+import com.github.dockerjava.api.command.WaitContainerResultCallback;
+import com.github.dockerjava.api.model.Bind;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.api.model.StreamType;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientBuilder;
+import com.github.dockerjava.core.command.LogContainerResultCallback;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@BlockingTaskDef(name = "GenericDataParsingTask")
+public class GenericDataParsingTask extends BlockingTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(GenericDataParsingTask.class);
+
+ @TaskParam(name = "ParserID")
+ private ThreadLocal<String> parserId = new ThreadLocal<>();
+
+ @TaskParam(name = "InputMapping")
+ private ThreadLocal<StringMap> inputMapping = new ThreadLocal<>();
+
+ @Override
+ public TaskResult runBlockingCode() {
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+ DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
+ ParserFetchResponse parserFetchResponse = parserClient
+ .fetchParser(ParserFetchRequest.newBuilder()
+ .setParserId(getParserId()).build());
+
+ DataParser parser = parserFetchResponse.getParser();
+ List<DataParserInputInterface> inputInterfaces = parser.getInputInterfacesList();
+
+ String tempWorkDir = "/tmp/" + UUID.randomUUID();
+ String tempInputDir = tempWorkDir + File.separator + "inputs";
+ String tempOutputDir = tempWorkDir + File.separator + "outputs";
+ logger.info("Using temp working directory {}", tempWorkDir);
+ try {
+ Files.createDirectory(Paths.get(tempWorkDir));
+ Files.createDirectory(Paths.get(tempInputDir));
+ Files.createDirectory(Paths.get(tempOutputDir));
+ } catch (IOException e) {
+ logger.error("Failed to create temp working directories in {}", tempWorkDir, e);
+ return new TaskResult(TaskResult.Status.FAILED, "Failed to create temp working directories");
+ }
+
+ for (DataParserInputInterface dpi: inputInterfaces) {
+ String path = getInputMapping().get(dpi.getParserInputInterfaceId());
+ if (path == null) {
+ logger.error("No value specified for input {}", dpi.getParserInputInterfaceId());
+ return new TaskResult(TaskResult.Status.FAILED, "No value specified for input");
+ }
+
+ if (path.startsWith("$")) {
+ path = getUserContent(path.substring(1), Scope.WORKFLOW);
+ if (path == null) {
+ logger.error("No value in context to path {} for {}", path, dpi.getParserInputInterfaceId());
+ return new TaskResult(TaskResult.Status.FAILED, "No value specified in context for path");
+ }
+ }
+
+ try {
+ Files.copy(Paths.get(path), Paths.get(tempInputDir + File.separator + dpi.getInputName()));
+ logger.info("Copied input file from path {} to {}", path, tempInputDir + File.separator + dpi.getInputName());
+ } catch (IOException e) {
+ logger.error("Failed to copy the input from path {} to {}", path, tempInputDir);
+ return new TaskResult(TaskResult.Status.FAILED, "Failed to copy the input");
+ }
+ }
+
+ try {
+ runContainer(parser, tempInputDir, tempOutputDir, new HashMap<>());
+ exportOutputs(parser, tempOutputDir);
+ } catch (Exception e) {
+ logger.error("Failed to execute the container for task {}", getTaskId());
+ return new TaskResult(TaskResult.Status.FAILED, "Failed to execute the container");
+ }
+ return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ }
+
+ private void exportOutputs(DataParser parser, String outputPath) {
+ for (DataParserOutputInterface dpoi: parser.getOutputInterfacesList()) {
+ putUserContent(getTaskId() + "-" + dpoi.getOutputName(),
+ outputPath + File.separator + dpoi.getOutputName(),
+ Scope.WORKFLOW);
+ }
+ }
+
+ private void runContainer(DataParser parser, String inputPath, String outputPath, Map<String, String> environmentValues)
+ throws Exception{
+
+ DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
+ DockerClient dockerClient = DockerClientBuilder.getInstance(config.build()).build();
+
+ logger.info("Pulling image " + parser.getDockerImage());
+ try {
+ dockerClient.pullImageCmd(parser.getDockerImage().split(":")[0])
+ .withTag(parser.getDockerImage().split(":")[1])
+ .exec(new PullImageResultCallback()).awaitCompletion();
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while pulling image", e);
+ throw e;
+ }
+
+ logger.info("Successfully pulled image " + parser.getDockerImage());
+
+ String containerId = UUID.randomUUID().toString();
+ String commands[] = parser.getExecCommand().split(" ");
+ CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getDockerImage()).withCmd(commands).withName(containerId)
+ .withBinds(Bind.parse(inputPath + ":" + parser.getInputPath()),
+ Bind.parse(outputPath + ":" + parser.getOutputPath()))
+ .withTty(true)
+ .withAttachStdin(true)
+ .withAttachStdout(true).withEnv(environmentValues.entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.toList()))
+ .exec();
+
+ logger.info("Created the container with id " + containerResponse.getId());
+
+ final StringBuilder dockerLogs = new StringBuilder();
+
+ if (containerResponse.getWarnings() != null && containerResponse.getWarnings().length > 0) {
+ StringBuilder warningStr = new StringBuilder();
+ for (String w : containerResponse.getWarnings()) {
+ warningStr.append(w).append(",");
+ }
+ logger.warn("Container " + containerResponse.getId() + " warnings : " + warningStr);
+ } else {
+ logger.info("Starting container with id " + containerResponse.getId());
+ dockerClient.startContainerCmd(containerResponse.getId()).exec();
+ LogContainerCmd logContainerCmd = dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
+
+ try {
+
+ logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
+ @Override
+ public void onNext(Frame item) {
+ logger.info("Got frame: {}", item);;
+ if (item.getStreamType() == StreamType.STDOUT) {
+ dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
+ dockerLogs.append("\n");
+ } else if (item.getStreamType() == StreamType.STDERR) {
+ dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
+ dockerLogs.append("\n");
+ }
+ super.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ logger.error("Errored while running the container {}", containerId, throwable);
+ super.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ logger.info("Container {} successfully completed", containerId);
+ super.onComplete();
+ }
+ }).awaitCompletion();
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while reading container log" + e.getMessage());
+ throw e;
+ }
+
+ logger.info("Waiting for the container to stop");
+
+ Integer statusCode = dockerClient.waitContainerCmd(containerResponse.getId()).exec(new WaitContainerResultCallback()).awaitStatusCode();
+ logger.info("Container " + containerResponse.getId() + " exited with status code " + statusCode);
+ if (statusCode != 0) {
+ logger.error("Failing as non zero status code was returned");
+ throw new Exception("Failing as non zero status code was returned");
+ }
+
+ logger.info("Container logs " + dockerLogs.toString());
+ }
+ }
+
+ public String getParserId() {
+ return parserId.get();
+ }
+
+ public void setParserId(String parserId) {
+ this.parserId.set(parserId);
+ }
+
+ public StringMap getInputMapping() {
+ return inputMapping.get();
+ }
+
+ public void setInputMapping(StringMap inputMapping) {
+ this.inputMapping.set(inputMapping);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
new file mode 100644
index 0000000..7bb9489
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/MetadataPersistTask.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import com.google.protobuf.Struct;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.storage.AddResourceMetadataRequest;
+import org.apache.airavata.datalake.drms.storage.ResourceServiceGrpc;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Base64;
+
+@BlockingTaskDef(name = "MetadataPersistTask")
+public class MetadataPersistTask extends BlockingTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(MetadataPersistTask.class);
+
+ @TaskParam(name = "JSON_FILE")
+ private final ThreadLocal<String> jsonFile = new ThreadLocal<>();
+
+ @TaskParam(name = "DRMS_HOST")
+ private final ThreadLocal<String> drmsHost = new ThreadLocal<>();
+
+ @TaskParam(name = "DRMS_PORT")
+ private final ThreadLocal<Integer> drmsPort = new ThreadLocal<>();
+
+ @TaskParam(name = "RESOURCE_ID")
+ private final ThreadLocal<String> resourceId = new ThreadLocal<>();
+
+ @TaskParam(name = "DRMS_SERVICE_ACCOUNT_KEY")
+ private final ThreadLocal<String> serviceAccountKey = new ThreadLocal<>();
+
+ @TaskParam(name = "DRMS_SERVICE_ACCOUNT_SECRET")
+ private final ThreadLocal<String> serviceAccountSecret = new ThreadLocal<>();
+
+ @TaskParam(name = "USER")
+ private final ThreadLocal<String> user = new ThreadLocal<>();
+
+ @TaskParam(name = "TENANT")
+ private final ThreadLocal<String> tenant = new ThreadLocal<>();
+
+ @Override
+ public TaskResult runBlockingCode() throws Exception {
+
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(getDrmsHost(), getDrmsPort()).usePlaintext().build();
+
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(Base64.getEncoder()
+ .encodeToString((getServiceAccountKey() + ":" + getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)))
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(getUser())
+ .setTenantId(getTenant())
+ .build())
+ .build();
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceGrpc.newBlockingStub(channel);
+
+ String derivedFilePath = getJsonFile();
+ if (derivedFilePath.startsWith("$")) {
+ derivedFilePath = getUserContent(derivedFilePath.substring(1), Scope.WORKFLOW);
+ }
+
+ logger.info("Using json file {}", derivedFilePath);
+
+ String jsonString = new String(Files.readAllBytes(Path.of(derivedFilePath)));
+ //logger.info("Json Content {}", jsonString);
+
+ Struct.Builder structBuilder = Struct.newBuilder();
+ JsonFormat.parser().merge(jsonString, structBuilder);
+
+ logger.info("Adding metadata to resource {}", getResourceId());
+ resourceClient.addResourceMetadata(AddResourceMetadataRequest.newBuilder()
+ .setResourceId(getResourceId())
+ .setAuthToken(serviceAuthToken)
+ .setType("FILE")
+ .setMetadata(structBuilder.build()).build());
+ return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ }
+
+ public String getJsonFile() {
+ return jsonFile.get();
+ }
+
+ public void setJsonFile(String jsonFile) {
+ this.jsonFile.set(jsonFile);
+ }
+
+ public String getDrmsHost() {
+ return drmsHost.get();
+ }
+
+ public void setDrmsHost(String drmsHost) {
+ this.drmsHost.set(drmsHost);
+ }
+
+ public int getDrmsPort() {
+ return drmsPort.get();
+ }
+
+ public void setDrmsPort(int drmsPort) {
+ this.drmsPort.set(drmsPort);
+ }
+
+ public String getResourceId() {
+ return resourceId.get();
+ }
+
+ public void setResourceId(String resourceId) {
+ this.resourceId.set(resourceId);
+ }
+
+ public String getServiceAccountKey() {
+ return serviceAccountKey.get();
+ }
+
+ public void setServiceAccountKey(String serviceAccountKey) {
+ this.serviceAccountKey.set(serviceAccountKey);
+ }
+
+ public String getServiceAccountSecret() {
+ return serviceAccountSecret.get();
+ }
+
+ public void setServiceAccountSecret(String serviceAccountSecret) {
+ this.serviceAccountSecret.set(serviceAccountSecret);
+ }
+
+ public String getUser() {
+ return user.get();
+ }
+
+ public void setUser(String user) {
+ this.user.set(user);
+ }
+
+ public String getTenant() {
+ return tenant.get();
+ }
+
+ public void setTenant(String tenant) {
+ this.tenant.set(tenant);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
new file mode 100644
index 0000000..eff1e69
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/SyncLocalDataDownloadTask.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.*;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+
+@BlockingTaskDef(name = "SyncLocalDataDownloadTask")
+public class SyncLocalDataDownloadTask extends BlockingTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(SyncLocalDataDownloadTask.class);
+
+ @TaskParam(name = "MFTAPIHost")
+ private final ThreadLocal<String> mftHost = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTAPIPort")
+ private final ThreadLocal<Integer> mftPort = new ThreadLocal<>();
+
+ // Security
+ @TaskParam(name = "UserId")
+ private final ThreadLocal<String> userId = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTClientId")
+ private final ThreadLocal<String> mftClientId = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTClientSecret")
+ private final ThreadLocal<String> mftClientSecret = new ThreadLocal<>();
+
+ @TaskParam(name = "TenantId")
+ private final ThreadLocal<String> tenantId = new ThreadLocal<>();
+
+ @TaskParam(name = "SourceResourceId")
+ private final ThreadLocal<String> sourceResourceId = new ThreadLocal<>();
+
+ @TaskParam(name = "SourceCredToken")
+ private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();
+
+ public static void main(String args[]) {
+
+
+ SyncLocalDataDownloadTask task = new SyncLocalDataDownloadTask();
+ task.setMftClientId("mft-agent");
+ task.setMftClientSecret("kHqH27BloDCbLvwUA8ZYRlHcJxXZyby9PB90bTdU");
+ task.setUserId("isjarana");
+ task.setTenantId("custos-ii8g0cfwsz6ruwezykn9-10002640");
+ task.setMftHost("149.165.157.235");
+ task.setMftPort(7004);
+ task.setSourceResourceId("46c7659360f8bdb1473fc16572d98a8611cfd7bb2296660dcd917ed4b6d33c13");
+ task.setSourceCredToken("ed1af924-2a91-412c-bedf-d9321252456d");
+
+ task.runBlockingCode();
+ }
+
+ @Override
+ public TaskResult runBlockingCode() {
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(getMftHost(), getMftPort());
+
+ DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+ .setUserId(getUserId())
+ .setClientId(getMftClientId())
+ .setClientSecret(getMftClientSecret())
+ .putProperties("TENANT_ID", getTenantId()).build();
+
+ HttpDownloadApiResponse httpDownloadApiResponse = mftClient.submitHttpDownload(HttpDownloadApiRequest
+ .newBuilder()
+ .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build())
+ .setSourceResourceId(getSourceResourceId())
+ .setSourceToken(getSourceCredToken())
+ .setSourceType("SCP")
+ .setSourceResourceChildPath("")
+ .build());
+
+ FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+ .setResourceType("SCP")
+ .setResourceId(getSourceResourceId())
+ .setResourceToken(getSourceCredToken())
+ .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+ String downloadUrl = httpDownloadApiResponse.getUrl();
+ logger.info("Using download URL {}", downloadUrl);
+
+ String downloadPath = "/tmp/" + metadata.getFriendlyName();
+ try (BufferedInputStream in = new BufferedInputStream(new URL(downloadUrl).openStream());
+ FileOutputStream fileOutputStream = new FileOutputStream(downloadPath)) {
+ byte dataBuffer[] = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
+ fileOutputStream.write(dataBuffer, 0, bytesRead);
+ }
+ } catch (IOException e) {
+ logger.error("Failed to download file", e);
+ return new TaskResult(TaskResult.Status.FAILED, "Failed to download file");
+ }
+
+ logger.info("Downloaded to path {}", downloadPath);
+
+ putUserContent("DOWNLOAD_PATH", downloadPath, Scope.WORKFLOW);
+ return new TaskResult(TaskResult.Status.COMPLETED, "Success");
+ }
+
+ public String getSourceResourceId() {
+ return sourceResourceId.get();
+ }
+
+ public void setSourceResourceId(String sourceResourceId) {
+ this.sourceResourceId.set(sourceResourceId);
+ }
+
+ public String getSourceCredToken() {
+ return sourceCredToken.get();
+ }
+
+ public void setSourceCredToken(String sourceCredToken) {
+ this.sourceCredToken.set(sourceCredToken);
+ }
+
+ public String getUserId() {
+ return userId.get();
+ }
+
+ public void setUserId(String userId) {
+ this.userId.set(userId);
+ }
+
+ public String getMftHost() {
+ return mftHost.get();
+ }
+
+ public void setMftHost(String mftHost) {
+ this.mftHost.set(mftHost);
+ }
+
+ public Integer getMftPort() {
+ return mftPort.get();
+ }
+
+ public void setMftPort(Integer mftPort) {
+ this.mftPort.set(mftPort);
+ }
+
+ public String getMftClientId() {
+ return mftClientId.get();
+ }
+
+ public void setMftClientId(String mftClientId) {
+ this.mftClientId.set(mftClientId);
+ }
+
+ public String getMftClientSecret() {
+ return mftClientSecret.get();
+ }
+
+ public void setMftClientSecret(String mftClientSecret) {
+ this.mftClientSecret.set(mftClientSecret);
+ }
+
+ public String getTenantId() {
+ return tenantId.get();
+ }
+
+ public void setTenantId(String tenantId) {
+ this.tenantId.set(tenantId);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/types/StringMap.java
similarity index 66%
copy from data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/types/StringMap.java
index 9033f1a..4f210a7 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/types/StringMap.java
@@ -15,28 +15,23 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.types;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType;
-public abstract class BlockingTask extends AbstractTask {
+import java.util.HashMap;
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
- public BlockingTask() {
- }
+public class StringMap extends HashMap<String, String> implements TaskParamType {
@Override
- public TaskResult onRun() {
- return runBlockingCode();
+ public String serialize() {
+ return new Gson().toJson(this);
}
- public abstract TaskResult runBlockingCode();
-
@Override
- public void onCancel() {
-
+ public void deserialize(String content) {
+ StringMap stringMap = new Gson().fromJson(content, StringMap.class);
+ this.putAll(stringMap);
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java
index 1b9e761..4415dd2 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java
@@ -222,8 +222,8 @@ public class WorkflowOperator {
TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
if (outPortAnnotation != null) {
field.setAccessible(true);
- OutPort outPort = (OutPort) field.get(taskObj);
- outPorts.add(outPort);
+ List<OutPort> outPort = (List<OutPort>) field.get(taskObj);
+ outPorts.addAll(outPort);
}
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml
index 7c4394a..1c167e0 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml
+++ b/data-orchestrator/workflow-engine/workflow-engine-worker/src/main/resources/task-list.yaml
@@ -1,6 +1,9 @@
tasks:
blocking:
- org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.SyncLocalDataDownloadTask
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.GenericDataParsingTask
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.MetadataPersistTask
nonBlocking:
- org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
- org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask
diff --git a/data-resource-management-service/drms-api/pom.xml b/data-resource-management-service/drms-api/pom.xml
index bd63411..be1b2d3 100644
--- a/data-resource-management-service/drms-api/pom.xml
+++ b/data-resource-management-service/drms-api/pom.xml
@@ -53,6 +53,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.data.jpa}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-to-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>net.sf.dozer</groupId>
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 625ba8b..9c7e445 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -761,7 +761,8 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
responseObserver.onCompleted();
} catch (Exception ex) {
String msg = " Error occurred while adding resource metadata " + ex.getMessage();
- logger.error(" Error occurred while adding resource metadata: Messages {} ", ex.getMessage(), ex);
+ // Issue https://github.com/neo4j/neo4j-java-driver/issues/773
+ logger.error("Error occurred while adding resource metadata: Messages {}", ex.getMessage());
responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
}
}
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 57858f9..7692b2c 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -215,6 +215,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
@Override
public void searchStoragePreference(StoragePreferenceSearchRequest request, StreamObserver<StoragePreferenceSearchResponse> responseObserver) {
try {
+
+ System.out.println(request);
AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
Map<String, Object> userProps = new HashMap<>();
diff --git a/pom.xml b/pom.xml
index 2c4f041..5dcb453 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,7 +135,7 @@
<maven.surefile.plugin.version>3.0.0-M4</maven.surefile.plugin.version>
- <log.back.version>1.2.3</log.back.version>
+ <log.back.version>1.2.5</log.back.version>
<com.jcraft.version>0.1.55</com.jcraft.version>
@@ -143,8 +143,8 @@
<com.codahale.version>0.7.0</com.codahale.version>
- <neo4j.ogm.version>3.2.20</neo4j.ogm.version>
- <neo4j.version>3.4.6</neo4j.version>
+ <neo4j.ogm.version>3.2.25</neo4j.ogm.version>
+ <neo4j.version>4.3.2</neo4j.version>
<io.grpc.version>1.25.0</io.grpc.version>
<spring-security.version>5.3.4.RELEASE</spring-security.version>
<kafka-clients.version>1.0.0</kafka-clients.version>
@@ -153,6 +153,9 @@
<yaml.version>1.15</yaml.version>
<spring.boot.version>2.4.5</spring.boot.version>
<commons.beanutils.version>1.9.4</commons.beanutils.version>
+ <docker.java.version>3.2.11</docker.java.version>
+ <jackson.version>2.11.4</jackson.version>
+ <commons.io.version>2.6</commons.io.version>
</properties>
</project>