You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/02 09:11:14 UTC
[airavata-data-lake] branch master updated: Adding an API to fetch
MFT callback entries and implementing first part of non blocking MFT
transfers
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new bf113f9 Adding an API to fetch MFT callback entries and implementing first part of non blocking MFT transfers
bf113f9 is described below
commit bf113f96a0e026788f33218dd14c11d170b2bd8e
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jun 2 05:10:01 2021 -0400
Adding an API to fetch MFT callback entries and implementing first part of non blocking MFT transfers
---
data-orchestrator/workflow-engine/README.md | 6 +-
data-orchestrator/workflow-engine/pom.xml | 21 ++-
.../engine/services/handler/APIRunner.java | 27 ---
.../services/handler/WorkflowEngineAPIHandler.java | 38 ----
.../services/wm/DataSyncWorkflowManager.java | 91 ----------
.../engine/services/wm/PreWorkflowManager.java | 47 -----
.../engine/client/WorkflowEngineClient.java | 24 +++
.../pom.xml | 14 +-
.../workflow/engine/monitor/AsyncEventMonitor.java | 16 ++
.../engine/monitor/EventMonitorConfig.java} | 12 +-
.../monitor/filter/mft/DataTransferEvent.java} | 34 ++--
.../filter/mft/DataTransferEventDeserializer.java} | 31 ++--
.../filter/mft/DataTransferEventSerializer.java} | 27 +--
.../filter/mft/KafkaMFTMessageProducer.java | 62 +++++++
.../engine/monitor/filter/mft/MFTFilter.java | 68 ++++++++
.../engine/services/controller/Controller.java | 12 +-
.../engine/services/handler/APIRunner.java | 50 ++++++
.../engine/services/handler/AppConfig.java} | 35 ++--
.../services/handler/WorkflowEngineAPIHandler.java | 87 ++++++++++
.../engine/services/participant/Participant.java | 13 +-
.../services/wm/CallbackWorkflowEntity.java} | 54 ++++--
.../engine/services/wm/CallbackWorkflowStore.java} | 28 ++-
.../engine/services/wm/WorkflowOperator.java | 25 ++-
.../wm/datasync/DataSyncWorkflowManager.java | 179 +++++++++++++++++++
.../services/wm/datasync/MFTCallbackEntity.java} | 41 +++--
.../services/wm/datasync/MFTCallbackStore.java} | 39 ++---
.../workflow/engine/task/AbstractTask.java | 0
.../engine/task/BiSectionNonBlockingTask.java} | 24 ++-
.../workflow/engine/task/BlockingTask.java | 0
.../workflow/engine/task/NonBlockingTask.java | 0
.../orchestrator/workflow/engine/task/OutPort.java | 0
.../workflow/engine/task/TaskParamType.java | 0
.../engine/task/annotation/BlockingTaskDef.java | 0
.../engine/task/annotation/NonBlockingSection.java | 0
.../engine/task/annotation/NonBlockingTaskDef.java | 0
.../engine/task/annotation/TaskOutPort.java | 0
.../workflow/engine/task/annotation/TaskParam.java | 0
.../engine/task/impl/AsyncDataTransferTask.java | 193 +++++++++++++++++++++
.../engine/task/impl/ExampleBlockingTask.java | 0
.../engine/task/impl/ExampleNonBlockingTask.java | 0
.../src/main/resources/application.properties | 12 +-
.../src/main/resources/logback.xml | 0
.../src/main/resources/task-list.yaml | 3 +-
.../src/main/proto/service/WorkflowService.proto | 21 ++-
pom.xml | 2 +-
45 files changed, 961 insertions(+), 375 deletions(-)
diff --git a/data-orchestrator/workflow-engine/README.md b/data-orchestrator/workflow-engine/README.md
index 94bc9eb..712de75 100644
--- a/data-orchestrator/workflow-engine/README.md
+++ b/data-orchestrator/workflow-engine/README.md
@@ -1,8 +1,8 @@
### Service Execution Order
-* Controller
-* Participant
-* DataSyncWorkflowManager
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant.Participant
+* org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.DataSyncWorkflowManager
### Configure the participant with new tasks
diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml
index a19564e..4b0c738 100644
--- a/data-orchestrator/workflow-engine/pom.xml
+++ b/data-orchestrator/workflow-engine/pom.xml
@@ -31,7 +31,7 @@
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
- <module>workflow-engine-api</module>
+ <module>workflow-engine-core</module>
<module>workflow-engine-stubs</module>
<module>workflow-engine-client</module>
</modules>
@@ -56,6 +56,10 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -64,6 +68,11 @@
<version>${spring.boot.version}</version>
</dependency>
<dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <version>${spring.boot.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j.over.slf4j}</version>
@@ -78,5 +87,15 @@
<artifactId>commons-beanutils</artifactId>
<version>${commons.beanutils.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-api-client</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
deleted file mode 100644
index 6d69a39..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.domain.EntityScan;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
-import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
-
-@ComponentScan(basePackages = {"org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler",
- "org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm"})
-@SpringBootApplication()
-@EnableJpaAuditing
-@EnableJpaRepositories("org.apache.airavata.datalake")
-@EntityScan("org.apache.airavata.datalake")
-public class APIRunner implements CommandLineRunner {
- public static void main(String[] args) {
- SpringApplication.run(APIRunner.class, args);
- }
-
- @Override
- public void run(String... args) throws Exception {
- System.out.println("RUnning");
- }
-
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
deleted file mode 100644
index 7f760fa..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationResponse;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.PreWorkflowManager;
-import org.lognet.springboot.grpc.GRpcService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-@GRpcService
-public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServiceImplBase {
- private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowEngineAPIHandler.class);
-
- private PreWorkflowManager preworkflowManager;
-
-
- @Autowired
- public WorkflowEngineAPIHandler(PreWorkflowManager preworkflowManager) {
- this.preworkflowManager = preworkflowManager;
- }
-
- @Override
- public void invokeWorkflow(WorkflowInvocationRequest request,
- StreamObserver<WorkflowInvocationResponse> responseObserver) {
- try {
- preworkflowManager.launchWorkflow(request.getMessage().getMessageId());
-
-
- } catch (Exception ex) {
- String msg = "Error occurred while invoking blocking pipeline";
- LOGGER.error(msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
deleted file mode 100644
index 9f08751..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
-
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.ComponentScan;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-@SpringBootApplication()
-@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm")
-public class DataSyncWorkflowManager implements CommandLineRunner {
-
- private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class);
-
- @org.springframework.beans.factory.annotation.Value("${cluster.name}")
- private String clusterName;
-
- @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}")
- private String workflowManagerName;
-
- @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
- private String zkAddress;
-
- @Override
- public void run(String... args) throws Exception {
- WorkflowOperator workflowOperator = new WorkflowOperator();
- workflowOperator.init(clusterName, workflowManagerName, zkAddress);
-
- ExampleBlockingTask bt1 = new ExampleBlockingTask();
- bt1.setTaskId("bt1-" + UUID.randomUUID());
-
- ExampleBlockingTask bt2 = new ExampleBlockingTask();
- bt2.setTaskId("bt2-" + UUID.randomUUID());
-
- ExampleBlockingTask bt3 = new ExampleBlockingTask();
- bt3.setTaskId("bt3-" + UUID.randomUUID());
-
- ExampleBlockingTask bt4 = new ExampleBlockingTask();
- bt4.setTaskId("bt4-" + UUID.randomUUID());
-
- ExampleNonBlockingTask nbt1 = new ExampleNonBlockingTask();
- nbt1.setTaskId("nbt1-" + UUID.randomUUID());
- nbt1.setCurrentSection(2);
-
- // Setting dependency
- bt1.setOutPort(new OutPort().setNextTaskId(nbt1.getTaskId()));
- //bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
- //bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId()));
-
- Map<String, AbstractTask> taskMap = new HashMap<>();
- taskMap.put(bt1.getTaskId(), bt1);
- taskMap.put(nbt1.getTaskId(), nbt1);
- //taskMap.put(bt2.getTaskId(), bt2);
- //taskMap.put(bt3.getTaskId(), bt3);
- //taskMap.put(bt4.getTaskId(), bt4);
- //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
- String[] startTaskIds = {bt1.getTaskId()};
- String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
- logger.info("Launched workflow {}", workflowId);
- }
-
- public static void main(String args[]) throws Exception {
- SpringApplication.run(DataSyncWorkflowManager.class);
- }
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/PreWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/PreWorkflowManager.java
deleted file mode 100644
index 366c6ab..0000000
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/PreWorkflowManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
-
-import org.apache.airavata.datalake.orchestrator.registry.persistance.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.Optional;
-import java.util.UUID;
-
-/**
- * A class responsible to create task DAG and launch experiment
- */
-@Component
-public class PreWorkflowManager {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PreWorkflowManager.class);
-
- private DataOrchestratorEventRepository dataOrchestratorEventRepository;
-
- private WorkflowEntityRepository workflowEntityRepository;
-
- @Autowired
- public PreWorkflowManager(DataOrchestratorEventRepository dataOrchestratorEventRepository,
- WorkflowEntityRepository workflowEntityRepository) {
- this.dataOrchestratorEventRepository = dataOrchestratorEventRepository;
- this.workflowEntityRepository = workflowEntityRepository;
- }
-
- public boolean launchWorkflow(String msgId) {
-
- Optional<DataOrchestratorEntity> dataOrchestratorEntity = dataOrchestratorEventRepository.findById(msgId);
- dataOrchestratorEntity.ifPresent(enty -> {
- String workflowId = "WORKFLOW_" + UUID.randomUUID().toString();
- WorkflowEntity workFlowEntity = new WorkflowEntity();
- workFlowEntity.setId(workflowId);
- workFlowEntity.setDataOrchestratorEntity(enty);
- workFlowEntity.setStatus(EntityStatus.WORKFLOW_LAUNCHED.name());
- workflowEntityRepository.save(workFlowEntity);
- });
-
- return true;
- }
-
-
-}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java b/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java
index cfec521..040ce39 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-client/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/client/WorkflowEngineClient.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.airavata.datalake.orchestrator.workflow.engine.client;
import io.grpc.ManagedChannel;
@@ -7,6 +24,12 @@ import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage
import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
public class WorkflowEngineClient {
+
+ public static WorkflowServiceGrpc.WorkflowServiceBlockingStub buildClient(String host, int port) {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
+ return WorkflowServiceGrpc.newBlockingStub(channel);
+ }
+
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565).usePlaintext().build();
WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceStub =
@@ -16,5 +39,6 @@ public class WorkflowEngineClient {
.setMessage(WorkflowMessage.newBuilder().setMessageId("387bd8e9-58ce-4626-8347-90a8ab12376e "))
.build();
workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+ System.out.println("Done");
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/pom.xml b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
similarity index 88%
rename from data-orchestrator/workflow-engine/workflow-engine-api/pom.xml
rename to data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
index 594c438..56a0a07 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/pom.xml
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/pom.xml
@@ -11,7 +11,7 @@
<modelVersion>4.0.0</modelVersion>
- <artifactId>workflow-engine-api</artifactId>
+ <artifactId>workflow-engine-core</artifactId>
<dependencies>
@@ -63,7 +63,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
- <version>2.4.3</version>
+ <version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -97,6 +97,16 @@
<version>1.27</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>workflow-engine-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<properties>
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
similarity index 50%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
index 4afa566..7ff64bf 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
@@ -17,5 +17,21 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+
+@SpringBootApplication
+@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.monitor")
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+ DataSourceTransactionManagerAutoConfiguration.class,
+ HibernateJpaAutoConfiguration.class})
public class AsyncEventMonitor {
+ public static void main(String[] args) {
+ SpringApplication.run(AsyncEventMonitor.class, args);
+ }
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/EventMonitorConfig.java
similarity index 67%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/EventMonitorConfig.java
index 4afa566..a1d7d9b 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/EventMonitorConfig.java
@@ -17,5 +17,15 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;
-public class AsyncEventMonitor {
+import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.KafkaMFTMessageProducer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class EventMonitorConfig {
+
+ @Bean(initMethod = "init")
+ public KafkaMFTMessageProducer kafkaMFTMessageProducer() {
+ return new KafkaMFTMessageProducer();
+ }
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEvent.java
similarity index 58%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEvent.java
index 9033f1a..ffa0dab 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEvent.java
@@ -15,28 +15,34 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class DataTransferEvent {
+ private String taskId;
+ private String workflowId;
+ private String transferStatus;
-public abstract class BlockingTask extends AbstractTask {
-
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+ public String getTaskId() {
+ return taskId;
+ }
- public BlockingTask() {
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
}
- @Override
- public TaskResult onRun() {
- return runBlockingCode();
+ public String getWorkflowId() {
+ return workflowId;
}
- public abstract TaskResult runBlockingCode();
+ public void setWorkflowId(String workflowId) {
+ this.workflowId = workflowId;
+ }
- @Override
- public void onCancel() {
+ public String getTransferStatus() {
+ return transferStatus;
+ }
+ public void setTransferStatus(String transferStatus) {
+ this.transferStatus = transferStatus;
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventDeserializer.java
similarity index 62%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventDeserializer.java
index 9033f1a..12e2066 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventDeserializer.java
@@ -15,28 +15,19 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class BlockingTask extends AbstractTask {
-
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
- public BlockingTask() {
- }
+import org.apache.kafka.common.serialization.Deserializer;
+public class DataTransferEventDeserializer implements Deserializer<DataTransferEvent> {
@Override
- public TaskResult onRun() {
- return runBlockingCode();
- }
-
- public abstract TaskResult runBlockingCode();
-
- @Override
- public void onCancel() {
-
+ public DataTransferEvent deserialize(String s, byte[] bytes) {
+ String deserializedData = new String(bytes);
+ String[] parts = deserializedData.split("/");
+ DataTransferEvent dte = new DataTransferEvent();
+ dte.setTaskId(parts[0]);
+ dte.setWorkflowId(parts[1]);
+ dte.setTransferStatus(parts[2]);
+ return dte;
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventSerializer.java
similarity index 63%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventSerializer.java
index 9033f1a..c3a2d35 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/DataTransferEventSerializer.java
@@ -15,28 +15,19 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kafka.common.serialization.Serializer;
-public abstract class BlockingTask extends AbstractTask {
+import java.nio.charset.StandardCharsets;
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
- public BlockingTask() {
- }
+public class DataTransferEventSerializer implements Serializer<DataTransferEvent> {
@Override
- public TaskResult onRun() {
- return runBlockingCode();
- }
-
- public abstract TaskResult runBlockingCode();
-
- @Override
- public void onCancel() {
-
+ public byte[] serialize(String s, DataTransferEvent dataTransferEvent) {
+ String data = dataTransferEvent.getTaskId() +
+ "/" + dataTransferEvent.getWorkflowId() +
+ "/" + dataTransferEvent.getTransferStatus();
+ return data.getBytes(StandardCharsets.UTF_8);
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/KafkaMFTMessageProducer.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/KafkaMFTMessageProducer.java
new file mode 100644
index 0000000..c3b257c
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/KafkaMFTMessageProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
+
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+public class KafkaMFTMessageProducer {
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.url}")
+ private String kafkaUrl;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.mft.publisher.name}")
+ private String publisherName;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.mft.status.publish.topic}")
+ private String topic;
+
+ private Producer<String, DataTransferEvent> producer;
+
+ public void init() {
+ this.producer = createProducer();
+ }
+
+ private Producer<String, DataTransferEvent> createProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, publisherName);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ DataTransferEventSerializer.class.getName());
+ return new KafkaProducer<String, DataTransferEvent>(props);
+ }
+
+ public void publish(DataTransferEvent event) throws ExecutionException, InterruptedException {
+ final ProducerRecord<String, DataTransferEvent> record = new ProducerRecord<>(
+ topic,
+ event.getTaskId(),
+ event);
+ RecordMetadata recordMetadata = producer.send(record).get();
+ producer.flush();
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/MFTFilter.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/MFTFilter.java
new file mode 100644
index 0000000..6c76175
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/filter/mft/MFTFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.MFTCallbackGetRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.MFTCallbackGetResponse;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.client.WorkflowEngineClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/mft")
+public class MFTFilter {
+
+ private final static Logger logger = LoggerFactory.getLogger(MFTFilter.class);
+
+ @Autowired
+ private KafkaMFTMessageProducer mftMessageProducer;
+
+ @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.host}")
+ private String datasyncWmHost;
+
+ @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.port}")
+ private int datasyncWmPort;
+
+ @GetMapping(value = "/{transferId}/{status}")
+ public String fetchSFTPRemote(@PathVariable(name = "transferId") String transferId,
+ @PathVariable(name = "status") String status) {
+
+ logger.info("Got transfer status {} for transfer id {}", status, transferId);
+
+ WorkflowServiceGrpc.WorkflowServiceBlockingStub wmClient = WorkflowEngineClient.buildClient(datasyncWmHost, datasyncWmPort);
+ MFTCallbackGetResponse mftCallback = wmClient.getMFTCallback(
+ MFTCallbackGetRequest.newBuilder().setMftTransferId(transferId).build());
+
+ DataTransferEvent dataTransferEvent = new DataTransferEvent();
+ dataTransferEvent.setTransferStatus(status);
+ dataTransferEvent.setTaskId(mftCallback.getTaskId());
+ dataTransferEvent.setWorkflowId(mftCallback.getWorkflowId());
+ try {
+ mftMessageProducer.publish(dataTransferEvent);
+ } catch (Exception e) {
+ logger.error("Failed to publish even to kafka with transfer id {} and status {}", transferId, status);
+ }
+ return "OK";
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
similarity index 83%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
index 83e6af5..bcbba4a 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java
@@ -25,12 +25,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.CountDownLatch;
@SpringBootApplication()
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+ DataSourceTransactionManagerAutoConfiguration.class,
+ HibernateJpaAutoConfiguration.class})
@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller")
public class Controller implements CommandLineRunner {
@@ -89,6 +97,8 @@ public class Controller implements CommandLineRunner {
}
public static void main(String args[]) throws Exception {
- SpringApplication.run(Controller.class);
+ SpringApplication app = new SpringApplication(Controller.class);
+ app.setWebApplicationType(WebApplicationType.NONE);
+ app.run(args);
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
new file mode 100644
index 0000000..329bc40
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/APIRunner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+
+@ComponentScan(basePackages = {"org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler",
+ "org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm"})
+@SpringBootApplication()
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+ DataSourceTransactionManagerAutoConfiguration.class,
+ HibernateJpaAutoConfiguration.class})
+/*@EnableJpaAuditing
+@EnableJpaRepositories("org.apache.airavata.datalake")
+@EntityScan("org.apache.airavata.datalake")*/
+public class APIRunner implements CommandLineRunner {
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(APIRunner.class);
+ app.setWebApplicationType(WebApplicationType.NONE);
+ app.run(args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ System.out.println("Running");
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/AppConfig.java
similarity index 60%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/AppConfig.java
index 0bdd9c9..ece0005 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/AppConfig.java
@@ -15,29 +15,32 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
-import org.apache.helix.task.TaskResult;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.DataSyncWorkflowManager;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.MFTCallbackStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
-@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
-public class ExampleNonBlockingTask extends NonBlockingTask {
+@Configuration
+public class AppConfig {
+ private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class);
+ @Bean(initMethod = "init")
+ public DataSyncWorkflowManager dataSyncWorkflowManager() {
+ return new DataSyncWorkflowManager();
+ }
- @NonBlockingSection(sectionIndex = 1)
- public TaskResult section1() {
- logger.info("Running section 1");
- return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ @Bean
+ public CallbackWorkflowStore callbackWorkflowStore() {
+ return new CallbackWorkflowStore();
}
- @NonBlockingSection(sectionIndex = 2)
- public TaskResult section2() {
- logger.info("Running section 2");
- return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ @Bean
+ public MFTCallbackStore mftCallbackStore() {
+ return new MFTCallbackStore();
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
new file mode 100644
index 0000000..c4e44c1
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/handler/WorkflowEngineAPIHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.handler;
+
+import com.google.protobuf.Empty;
+import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.*;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.DataSyncWorkflowManager;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.MFTCallbackEntity;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync.MFTCallbackStore;
+import org.dozer.DozerBeanMapper;
+import org.lognet.springboot.grpc.GRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.Optional;
+
+@GRpcService
+public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServiceImplBase {
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowEngineAPIHandler.class);
+
+ @Autowired
+ private MFTCallbackStore mftCallbackStore;
+
+ @Autowired
+ private CallbackWorkflowStore callbackWorkflowStore;
+
+ @Autowired
+ private DataSyncWorkflowManager dataSyncWorkflowManager;
+
+ @Override
+ public void invokeWorkflow(WorkflowInvocationRequest request,
+ StreamObserver<WorkflowInvocationResponse> responseObserver) {
+ try {
+ dataSyncWorkflowManager.submitDataSyncWorkflow();
+ responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
+ responseObserver.onCompleted();
+ } catch (Exception ex) {
+ String msg = "Error occurred while invoking blocking pipeline";
+ logger.error(msg, ex);
+ responseObserver.onError(new Exception(msg));
+ }
+ }
+
+ @Override
+ public void getMFTCallback(MFTCallbackGetRequest request, StreamObserver<MFTCallbackGetResponse> responseObserver) {
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ Optional<MFTCallbackEntity> mftCallbackOp = mftCallbackStore.findMFTCallback(request.getMftTransferId());
+ if (mftCallbackOp.isPresent()) {
+ MFTCallbackEntity callbackEntity = mftCallbackOp.get();
+ MFTCallbackGetResponse.Builder builder = MFTCallbackGetResponse.newBuilder();
+ mapper.map(callbackEntity, builder);
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+ } else {
+ logger.error("No callback entity for mft transfer id {}", request.getMftTransferId());
+ responseObserver.onError(new Exception("No callback entity for mft transfer id " + request.getMftTransferId()));
+ }
+ }
+
+ @Override
+ public void saveMFTCallback(MFTCallbacSaveRequest request, StreamObserver<Empty> responseObserver) {
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ MFTCallbackEntity entity = mapper.map(request, MFTCallbackEntity.class);
+ mftCallbackStore.saveMFTCallBack(entity);
+ logger.info("Saved callback entity for transfer id {}", request.getMftTransferId());
+ responseObserver.onNext(Empty.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
similarity index 93%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
index 65314ee..69ecaf3 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java
@@ -17,6 +17,7 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
@@ -36,7 +37,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.yaml.snakeyaml.Yaml;
@@ -44,6 +50,9 @@ import java.io.*;
import java.util.*;
@SpringBootApplication()
+@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,
+ DataSourceTransactionManagerAutoConfiguration.class,
+ HibernateJpaAutoConfiguration.class})
@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant")
public class Participant implements CommandLineRunner {
@@ -268,6 +277,8 @@ public class Participant implements CommandLineRunner {
}
public static void main(String args[]) throws Exception {
- SpringApplication.run(Participant.class);
+ SpringApplication app = new SpringApplication(Participant.class);
+ app.setWebApplicationType(WebApplicationType.NONE);
+ app.run(args);
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
similarity index 50%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
index 0bdd9c9..7a14573 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowEntity.java
@@ -15,29 +15,47 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
-import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
-@NonBlockingTaskDef(name = "ExampleNonBlockingTask")
-public class ExampleNonBlockingTask extends NonBlockingTask {
+import java.util.Map;
- private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class);
+public class CallbackWorkflowEntity {
+ private int prevSectionIndex;
+ private Map<String, AbstractTask> taskMap;
+ private String workflowId;
+ private String startTaskId;
- @NonBlockingSection(sectionIndex = 1)
- public TaskResult section1() {
- logger.info("Running section 1");
- return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ public int getPrevSectionIndex() {
+ return prevSectionIndex;
}
- @NonBlockingSection(sectionIndex = 2)
- public TaskResult section2() {
- logger.info("Running section 2");
- return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ public void setPrevSectionIndex(int prevSectionIndex) {
+ this.prevSectionIndex = prevSectionIndex;
+ }
+
+ public Map<String, AbstractTask> getTaskMap() {
+ return taskMap;
+ }
+
+ public void setTaskMap(Map<String, AbstractTask> taskMap) {
+ this.taskMap = taskMap;
+ }
+
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ public void setWorkflowId(String workflowId) {
+ this.workflowId = workflowId;
+ }
+
+ public String getStartTaskId() {
+ return startTaskId;
+ }
+
+ public void setStartTaskId(String startTaskId) {
+ this.startTaskId = startTaskId;
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowStore.java
similarity index 58%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowStore.java
index 9033f1a..0f6ef8a 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/CallbackWorkflowStore.java
@@ -15,28 +15,22 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.*;
-public abstract class BlockingTask extends AbstractTask {
+public class CallbackWorkflowStore {
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+ private final List<CallbackWorkflowEntity> workflowEntities = new ArrayList<>();
- public BlockingTask() {
+ public void saveWorkflowEntity(CallbackWorkflowEntity cwe) {
+ workflowEntities.add(cwe);
}
- @Override
- public TaskResult onRun() {
- return runBlockingCode();
- }
-
- public abstract TaskResult runBlockingCode();
-
- @Override
- public void onCancel() {
-
+ public Optional<CallbackWorkflowEntity> getWorkflowEntity(String workflowId, String taskId, int prevSectionIndex) {
+ return workflowEntities.stream().filter(e ->
+ e.getWorkflowId().equals(workflowId) &&
+ e.getStartTaskId().equals(taskId) &&
+ e.getPrevSectionIndex() == prevSectionIndex).findFirst();
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
similarity index 90%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
index ba3ef58..4eee6f1 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java
@@ -47,8 +47,11 @@ public class WorkflowOperator {
private TaskDriver taskDriver;
private HelixManager helixManager;
+ private CallbackWorkflowStore cbws;
- public void init(String clusterName, String workflowManagerName, String zkAddress) throws Exception {
+ public void init(String clusterName, String workflowManagerName, String zkAddress, CallbackWorkflowStore cbws)
+ throws Exception {
+ this.cbws = cbws;
helixManager = HelixManagerFactory.getZKHelixManager(clusterName, workflowManagerName,
InstanceType.SPECTATOR, zkAddress);
helixManager.connect();
@@ -83,7 +86,7 @@ public class WorkflowOperator {
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0);
for (String startTaskId: startTaskIds) {
- buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap);
+ buildWorkflowRecursively(workflowBuilder, workflowName, startTaskId, taskMap);
}
WorkflowConfig.Builder config = new WorkflowConfig.Builder()
@@ -98,11 +101,19 @@ public class WorkflowOperator {
return workflowName;
}
- private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String nonBlockingTaskId, int currentSection) {
+ private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String workflowName,
+ String nonBlockingTaskId, int currentSection) {
+ CallbackWorkflowEntity cwe = new CallbackWorkflowEntity();
+ cwe.setWorkflowId(workflowName);
+ cwe.setPrevSectionIndex(currentSection);
+ cwe.setTaskMap(taskMap);
+ cwe.setStartTaskId(nonBlockingTaskId);
+ this.cbws.saveWorkflowEntity(cwe);
}
- private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap)
+ private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String workflowName,
+ String nextTaskId, Map<String, AbstractTask> taskMap)
throws Exception{
AbstractTask currentTask = taskMap.get(nextTaskId);
@@ -142,7 +153,7 @@ public class WorkflowOperator {
if (outPort != null) {
workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
- buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap);
+ buildWorkflowRecursively(workflowBuilder, workflowName, outPort.getNextTaskId(), taskMap);
}
}
} else if (nonBlockingTaskDef != null) {
@@ -170,7 +181,7 @@ public class WorkflowOperator {
workflowBuilder.addJob(currentTask.getTaskId(), job);
- continueNonBlockingRest(taskMap, nextTaskId, nbTask.getCurrentSection());
+ continueNonBlockingRest(taskMap, workflowName, nextTaskId, nbTask.getCurrentSection());
} else {
logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName());
throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName());
@@ -204,7 +215,7 @@ public class WorkflowOperator {
TaskParam parm = classField.getAnnotation(TaskParam.class);
try {
if (parm != null) {
- Object propertyValue = PropertyUtils.getProperty(data, parm.name());
+ Object propertyValue = PropertyUtils.getProperty(data, classField.getName());
if (propertyValue instanceof TaskParamType) {
result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize());
} else {
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
new file mode 100644
index 0000000..53350c4
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/DataSyncWorkflowManager.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEvent;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.monitor.filter.mft.DataTransferEventDeserializer;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.CallbackWorkflowStore;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.WorkflowOperator;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class DataSyncWorkflowManager {
+
+ private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class);
+
+ @org.springframework.beans.factory.annotation.Value("${cluster.name}")
+ private String clusterName;
+
+ @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}")
+ private String workflowManagerName;
+
+ @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
+ private String zkAddress;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.url}")
+ private String kafkaUrl;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.mft.status.consumer.group}")
+ private String kafkaConsumerGroup;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.mft.status.publish.topic}")
+ private String kafkaTopic;
+
+ @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.host}")
+ private String datasyncWmHost;
+
+ @org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.port}")
+ private int datasyncWmPort;
+
+ @Autowired
+ private CallbackWorkflowStore callbackWorkflowStore;
+
+ private ExecutorService kafkaMessageProcessPool = Executors.newFixedThreadPool(10);
+
+ private WorkflowOperator workflowOperator;
+
+ private Consumer<String, DataTransferEvent> createConsumer() {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DataTransferEventDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
+ // Create the consumer using props.
+ final Consumer<String, DataTransferEvent> consumer = new KafkaConsumer<>(props);
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(kafkaTopic));
+ return consumer;
+ }
+
+ private boolean processKakfaMessage(DataTransferEvent dte) {
+ logger.info("Processing DTE for task {}, workflow {} and status {}",
+ dte.getTaskId(), dte.getWorkflowId(), dte.getTransferStatus());
+ return true;
+ }
+
+ private void runKafkaConsumer() {
+
+ final Consumer<String, DataTransferEvent> mftEventConsumer = createConsumer();
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(() -> {
+ while (true) {
+ ConsumerRecords<String, DataTransferEvent> consumerRecords = mftEventConsumer.poll(Duration.ofMillis(1000));
+ if (!consumerRecords.isEmpty()) {
+
+ CompletionService<Boolean> executorCompletionService = new ExecutorCompletionService<>(kafkaMessageProcessPool);
+
+ List<Future<Boolean>> processingFutures = new ArrayList<>();
+
+ for (TopicPartition partition : consumerRecords.partitions()) {
+ List<ConsumerRecord<String, DataTransferEvent>> partitionRecords = consumerRecords.records(partition);
+ for (ConsumerRecord<String, DataTransferEvent> record : partitionRecords) {
+ processingFutures.add(executorCompletionService.submit(() -> {
+ boolean success = processKakfaMessage(record.value());
+ logger.info("Processing DTE for task " + record.value().getTaskId() + " : " + success);
+ return success;
+ }));
+
+ mftEventConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
+ }
+ }
+
+ for (Future<Boolean> f : processingFutures) {
+ try {
+ executorCompletionService.take().get();
+ } catch (Exception e) {
+ logger.error("Failed processing DTE", e);
+ }
+ }
+ logger.info("All messages processed. Moving to next round");
+ }
+ }
+ });
+ }
+
+ public void init() throws Exception {
+ workflowOperator = new WorkflowOperator();
+ workflowOperator.init(clusterName, workflowManagerName, zkAddress, callbackWorkflowStore);
+ runKafkaConsumer();
+ logger.info("Successfully initialized DatasyncWorkflow Manager");
+ }
+
+ public void submitDataSyncWorkflow() throws Exception {
+ AsyncDataTransferTask dt1 = new AsyncDataTransferTask();
+ dt1.setSourceResourceId("");
+ dt1.setDestinationResourceId("");
+ dt1.setSourceCredToken("");
+ dt1.setDestinationCredToken("");
+ dt1.setCallbackUrl("localhost:33335");
+ dt1.setMftHost("localhost");
+ dt1.setMftPort(7004);
+ dt1.setMftClientId("");
+ dt1.setMftClientSecret("");
+ dt1.setUserId("dimuthu");
+ dt1.setCurrentSection(1);
+ dt1.setTaskId("dt-" + UUID.randomUUID().toString());
+ dt1.setMftCallbackStoreHost(datasyncWmHost);
+ dt1.setMftCallbackStorePort(datasyncWmPort);
+
+ Map<String, AbstractTask> taskMap = new HashMap<>();
+ taskMap.put(dt1.getTaskId(), dt1);
+ //taskMap.put(bt2.getTaskId(), bt2);
+ //taskMap.put(bt3.getTaskId(), bt3);
+ //taskMap.put(bt4.getTaskId(), bt4);
+ //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()};
+ String[] startTaskIds = {dt1.getTaskId()};
+ String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
+ logger.info("Launched workflow {}", workflowId);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackEntity.java
similarity index 51%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackEntity.java
index 9033f1a..b858350 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackEntity.java
@@ -15,28 +15,43 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync;
-import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class MFTCallbackEntity {
+ private String mftTransferId;
+ private int prevSectionIndex;
+ private String workflowId;
+ private String taskId;
-public abstract class BlockingTask extends AbstractTask {
+ public String getMftTransferId() {
+ return mftTransferId;
+ }
+
+ public void setMftTransferId(String mftTransferId) {
+ this.mftTransferId = mftTransferId;
+ }
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
+ public int getPrevSectionIndex() {
+ return prevSectionIndex;
+ }
- public BlockingTask() {
+ public void setPrevSectionIndex(int prevSectionIndex) {
+ this.prevSectionIndex = prevSectionIndex;
}
- @Override
- public TaskResult onRun() {
- return runBlockingCode();
+ public String getWorkflowId() {
+ return workflowId;
}
- public abstract TaskResult runBlockingCode();
+ public void setWorkflowId(String workflowId) {
+ this.workflowId = workflowId;
+ }
- @Override
- public void onCancel() {
+ public String getTaskId() {
+ return taskId;
+ }
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
}
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackStore.java
similarity index 59%
copy from data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackStore.java
index 12526a6..d17a891 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/datasync/MFTCallbackStore.java
@@ -15,31 +15,26 @@
* limitations under the License.
*/
-syntax = "proto3";
+package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.datasync;
-option java_multiple_files = true;
-package org.apache.airavata.datalake.orchestrator.workflow.engine;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
-import "google/api/annotations.proto";
-import "Common.proto";
+// TODO: Fetch these from database
+public class MFTCallbackStore {
-message WorkflowMessage {
- string message_id = 1;
- string resource_id = 2;
-}
+ private static final Map<String, MFTCallbackEntity> store = new HashMap<>();
-message WorkflowInvocationRequest {
- org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken authToken = 1;
- WorkflowMessage message = 2;
-}
+ public Optional<MFTCallbackEntity> findMFTCallback(String transferId) {
+ if (!store.containsKey(transferId)) {
+ return Optional.empty();
+ } else {
+ return Optional.of(store.get(transferId));
+ }
+ }
-message WorkflowInvocationResponse {
- bool status = 1;
+ public void saveMFTCallBack(MFTCallbackEntity callbackEntity) {
+ store.put(callbackEntity.getMftTransferId(), callbackEntity);
+ }
}
-
-
-service WorkflowService {
-
- rpc invokeWorkflow (WorkflowInvocationRequest) returns (WorkflowInvocationResponse);
-
-}
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BiSectionNonBlockingTask.java
similarity index 65%
copy from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
copy to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BiSectionNonBlockingTask.java
index 9033f1a..68b6621 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BiSectionNonBlockingTask.java
@@ -17,26 +17,22 @@
package org.apache.airavata.datalake.orchestrator.workflow.engine.task;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection;
import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class BlockingTask extends AbstractTask {
+public abstract class BiSectionNonBlockingTask extends NonBlockingTask{
- private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class);
-
- public BlockingTask() {
+ @NonBlockingSection(sectionIndex = 1)
+ public final TaskResult section1() {
+ return beforeSection();
}
- @Override
- public TaskResult onRun() {
- return runBlockingCode();
+ @NonBlockingSection(sectionIndex = 2)
+ public final TaskResult section2() {
+ return afterSection();
}
- public abstract TaskResult runBlockingCode();
-
- @Override
- public void onCancel() {
+ public abstract TaskResult beforeSection();
- }
+ public abstract TaskResult afterSection();
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
new file mode 100644
index 0000000..91b0cc4
--- /dev/null
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
@@ -0,0 +1,193 @@
+package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl;
+
+import org.apache.airavata.datalake.orchestrator.workflow.engine.MFTCallbacSaveRequest;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.client.WorkflowEngineClient;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BiSectionNonBlockingTask;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef;
+import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.api.service.TransferApiResponse;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@NonBlockingTaskDef(name = "AsyncDataTransferTask")
+public class AsyncDataTransferTask extends BiSectionNonBlockingTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(AsyncDataTransferTask.class);
+
+ @TaskParam(name = "SourceResourceId")
+ private final ThreadLocal<String> sourceResourceId = new ThreadLocal<>();
+
+ @TaskParam(name = "DestinationResourceId")
+ private final ThreadLocal<String> destinationResourceId = new ThreadLocal<>();
+
+ @TaskParam(name = "SourceCredToken")
+ private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();
+
+ @TaskParam(name = "DestinationCredToken")
+ private final ThreadLocal<String> destinationCredToken = new ThreadLocal<>();
+
+ @TaskParam(name = "UserId")
+ private final ThreadLocal<String> userId = new ThreadLocal<>();
+
+ @TaskParam(name = "CallbackUrl")
+ private final ThreadLocal<String> callbackUrl = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTAPIHost")
+ private final ThreadLocal<String> mftHost = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTAPIPort")
+ private final ThreadLocal<Integer> mftPort = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTClientId")
+ private final ThreadLocal<String> mftClientId = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTClientSecret")
+ private final ThreadLocal<String> mftClientSecret = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTCallbackStoreHost")
+ private ThreadLocal<String> mftCallbackStoreHost = new ThreadLocal<>();
+
+ @TaskParam(name = "MFTCallbackStorePort")
+ private ThreadLocal<Integer> mftCallbackStorePort = new ThreadLocal<>();
+
+
+ public TaskResult beforeSection() {
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(getMftHost(), getMftPort());
+ TransferApiResponse submitResponse = mftClient.submitTransfer(TransferApiRequest.newBuilder()
+ .setMftAuthorizationToken(AuthToken.newBuilder()
+ .setDelegateAuth(
+ DelegateAuth.newBuilder()
+ .setUserId(getUserId())
+ .setClientId(getMftClientId())
+ .setClientSecret(getMftClientSecret()).build())
+ .build())
+ .setSourceResourceId(getSourceResourceId())
+ .setSourceToken(getSourceCredToken())
+ .setDestinationResourceId(getDestinationResourceId())
+ .setDestinationToken(getDestinationCredToken())
+ .build());
+
+ logger.info("Submitted transfer {} for task id {}", submitResponse.getTransferId(), getTaskId());
+
+ WorkflowServiceGrpc.WorkflowServiceBlockingStub wfServiceClient = WorkflowEngineClient.buildClient(
+ getMftCallbackStoreHost(), getMftCallbackStorePort());
+
+
+ wfServiceClient.saveMFTCallback(MFTCallbacSaveRequest.newBuilder()
+ .setMftTransferId(submitResponse.getTransferId())
+ .setTaskId(getTaskId())
+ .setWorkflowId(getCallbackContext().getJobConfig().getWorkflow())
+ .setPrevSectionIndex(1).build());
+
+ logger.info("Part 1 completed for task id {}", getTaskId());
+ return new TaskResult(TaskResult.Status.COMPLETED, "Section 1 completed");
+ }
+
+ public TaskResult afterSection() {
+ logger.info("Transfer completed successfully");
+ return new TaskResult(TaskResult.Status.COMPLETED, "Section 2 completed");
+ }
+
+ public String getSourceResourceId() {
+ return sourceResourceId.get();
+ }
+
+ public void setSourceResourceId(String sourceResourceId) {
+ this.sourceResourceId.set(sourceResourceId);
+ }
+
+ public String getDestinationResourceId() {
+ return destinationResourceId.get();
+ }
+
+ public void setDestinationResourceId(String destinationResourceId) {
+ this.destinationResourceId.set(destinationResourceId);
+ }
+
+ public String getSourceCredToken() {
+ return sourceCredToken.get();
+ }
+
+ public void setSourceCredToken(String sourceCredToken) {
+ this.sourceCredToken.set(sourceCredToken);
+ }
+
+ public String getDestinationCredToken() {
+ return destinationCredToken.get();
+ }
+
+ public void setDestinationCredToken(String destinationCredToken) {
+ this.destinationCredToken.set(destinationCredToken);
+ }
+
+ public String getUserId() {
+ return userId.get();
+ }
+
+ public void setUserId(String userId) {
+ this.userId.set(userId);
+ }
+
+ public String getCallbackUrl() {
+ return callbackUrl.get();
+ }
+
+ public void setCallbackUrl(String callbackUrl) {
+ this.callbackUrl.set(callbackUrl);
+ }
+
+ public String getMftHost() {
+ return mftHost.get();
+ }
+
+ public void setMftHost(String mftHost) {
+ this.mftHost.set(mftHost);
+ }
+
+ public Integer getMftPort() {
+ return mftPort.get();
+ }
+
+ public void setMftPort(Integer mftPort) {
+ this.mftPort.set(mftPort);
+ }
+
+ public String getMftClientId() {
+ return mftClientId.get();
+ }
+
+ public void setMftClientId(String mftClientId) {
+ this.mftClientId.set(mftClientId);
+ }
+
+ public String getMftClientSecret() {
+ return mftClientSecret.get();
+ }
+
+ public void setMftClientSecret(String mftClientSecret) {
+ this.mftClientSecret.set( mftClientSecret);
+ }
+
+ public String getMftCallbackStoreHost() {
+ return mftCallbackStoreHost.get();
+ }
+
+ public void setMftCallbackStoreHost(String mftCallbackStoreHost) {
+ this.mftCallbackStoreHost.set(mftCallbackStoreHost);
+ }
+
+ public Integer getMftCallbackStorePort() {
+ return mftCallbackStorePort.get();
+ }
+
+ public void setMftCallbackStorePort(Integer mftCallbackStorePort) {
+ this.mftCallbackStorePort.set(mftCallbackStorePort);
+ }
+}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/application.properties b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/application.properties
similarity index 85%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/application.properties
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/application.properties
index 8b8d553..6f3a5ae 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/application.properties
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/application.properties
@@ -26,6 +26,13 @@ task.list.file=task-list.yaml
datasync.wm.name=datasync_wf
+server.port=33335
+
+kafka.url=localhost:9092
+kafka.mft.publisher.name=mft-status-publisher
+kafka.mft.status.publish.topic=mft-status-topic
+kafka.mft.status.consumer.group=mft-even-group
+
spring.datasource.url = jdbc:mysql://localhost:3306/data_orchestrator?useSSL=false&serverTimezone=UTC&useLegacyDatetimeCode=false&createDatabaseIfNotExist=true&allowPublicKeyRetrieval=true
spring.datasource.username = root
@@ -37,4 +44,7 @@ spring.jpa.properties.hibernate.enable_lazy_load_no_trans=true
# Hibernate ddl auto (create, create-drop, validate, update)
spring.jpa.hibernate.ddl-auto = update
spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
-spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
\ No newline at end of file
+spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
+
+datasync.wm.grpc.host=localhost
+datasync.wm.grpc.port=6565
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/logback.xml b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/logback.xml
similarity index 100%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/logback.xml
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/logback.xml
diff --git a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/task-list.yaml
similarity index 60%
rename from data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/task-list.yaml
rename to data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/task-list.yaml
index e9b17a4..456492f 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-api/src/main/resources/task-list.yaml
+++ b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/resources/task-list.yaml
@@ -2,4 +2,5 @@ tasks:
blocking:
- org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask
nonBlocking:
- - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
\ No newline at end of file
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask
+ - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask
\ No newline at end of file
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index 12526a6..64cecad 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -22,6 +22,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine;
import "google/api/annotations.proto";
import "Common.proto";
+import "google/protobuf/empty.proto";
message WorkflowMessage {
string message_id = 1;
@@ -37,9 +38,27 @@ message WorkflowInvocationResponse {
bool status = 1;
}
+message MFTCallbackGetRequest {
+ string mftTransferId = 1;
+}
+
+message MFTCallbackGetResponse {
+ string mftTransferId = 1;
+ int32 prevSectionIndex = 2;
+ string workflowId = 3;
+ string taskId = 4;
+}
+
+message MFTCallbacSaveRequest {
+ string mftTransferId = 1;
+ int32 prevSectionIndex = 2;
+ string workflowId = 3;
+ string taskId = 4;
+}
service WorkflowService {
rpc invokeWorkflow (WorkflowInvocationRequest) returns (WorkflowInvocationResponse);
-
+ rpc getMFTCallback(MFTCallbackGetRequest) returns (MFTCallbackGetResponse);
+ rpc saveMFTCallback(MFTCallbacSaveRequest) returns (google.protobuf.Empty);
}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 046a1cf..b6bd04f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
<snakeyaml.version>1.15</snakeyaml.version>
<yaml.version>1.15</yaml.version>
- <spring.boot.version>2.2.1.RELEASE</spring.boot.version>
+ <spring.boot.version>2.5.0</spring.boot.version>
<commons.beanutils.version>1.9.4</commons.beanutils.version>
</properties>