You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/09 02:32:39 UTC

[GitHub] [incubator-seatunnel] Hisoka-X opened a new pull request, #2386: [Engine][Task] Add task runtime logic

Hisoka-X opened a new pull request, #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   Add task runtime logic
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942058414


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);

Review Comment:
   add to java queue in memory, batch can't change anything, row alway put in queue one by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r944406116


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/FlowConfig.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical.config;
+
+import java.io.Serializable;
+
+/**
+ * This interface indicates that this class is the configuration information of Flow
+ */
+public interface FlowConfig extends Serializable {

Review Comment:
   Can you introduce the scenario of using this interface?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java:
##########
@@ -57,14 +73,24 @@ private static class Factory implements DataSerializableFactory {
         @Override
         public IdentifiedDataSerializable create(int typeId) {
             switch (typeId) {
-                case REGISTER_TYPE:
-                    return new RegisterOperation();
+                case SOURCE_REGISTER_TYPE:
+                    return new SourceRegisterOperation();
                 case REQUEST_SPLIT_TYPE:
                     return new RequestSplitOperation();
                 case ASSIGN_SPLIT_TYPE:
-                    return new AssignSplitOperation();
+                    return new AssignSplitOperation<>();
                 case TASK_GROUP_INFO_TYPE:
                     return new TaskGroupImmutableInformation();
+                case SOURCE_UNREGISTER_TYPE:
+                    return new SourceUnregisterOperation();
+                case SINK_REGISTER_TYPE:
+                    return new SinkRegisterOperation();
+                case SINK_UNREGISTER_TYPE:
+                    return new SinkUnregisterOperation();
+                case TASK_LOCATION_TYPE:
+                    return new TaskLocation();
+                case PROGRESS_TYPE:
+                    return new Progress();
                 default:
                     return null;

Review Comment:
   I suggest throw a `throw new IllegalArgumentException("Unknown type id " + typeId);` here.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java:
##########
@@ -42,8 +42,8 @@ public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
 
     @Override
     protected NonCompletableFuture<?> doRun() throws Exception {
-        TaskExecutionService taskExecutionService = getService();
-        NonCompletableFuture<TaskExecutionState> voidCompletableFuture = taskExecutionService.deployTask(taskImmutableInformation);
+        SeaTunnelServer server = getService();
+        NonCompletableFuture<TaskExecutionState> voidCompletableFuture =  server.getTaskExecutionService().deployTask(taskImmutableInformation);
         return voidCompletableFuture;

Review Comment:
   ```suggestion
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/PhysicalPlanTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag;
+
+import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import com.hazelcast.spi.impl.NodeEngine;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+
+public class PhysicalPlanTest {
+
+    @Test
+    public void testLogicalToPhysical() throws MalformedURLException {
+
+        HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+        NodeEngine nodeEngine = instance.node.nodeEngine;
+
+        IdGenerator idGenerator = new IdGenerator();
+
+        Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", new FakeSource(),
+                Collections.singletonList(new URL("file:///fake.jar")));

Review Comment:
   Where can we find `fake.jar` ?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java:
##########
@@ -30,48 +50,95 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 
-public class SeaTunnelTask extends AbstractTask {
+public abstract class SeaTunnelTask extends AbstractTask {
 
     private static final long serialVersionUID = 2604309561613784425L;
     private final Flow executionFlow;
 
-    // TODO init memberID in task execution service
-    private UUID memberID = UUID.randomUUID();
-    private int enumeratorTaskID = -1;
+    protected FlowLifeCycle startFlowLifeCycle;
 
-    public SeaTunnelTask(long taskID, Flow executionFlow) {
-        super(taskID);
-        // TODO add enumerator task ID
-        enumeratorTaskID = 1;
+    protected List<OneInputFlowLifeCycle<Record>> outputs;
+
+    protected int indexID;
+
+    private TaskGroup taskBelongGroup;
+
+    public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) {
+        super(jobID, taskID);
+        this.indexID = indexID;
         this.executionFlow = executionFlow;
     }
 
     @Override
-    public void init() {
+    public void init() throws Exception {
+        super.init();
+        startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
     }
 
-    @Override
-    public void close() throws IOException {
-        super.close();
+    public void setTaskGroup(TaskGroup group) {
+        this.taskBelongGroup = group;
     }
 
-    private void register() {
-        if (startFromSource()) {
-            this.executionContext.sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
+    @SuppressWarnings({"unchecked", "rawtypes", "checkstyle:MagicNumber"})
+    private FlowLifeCycle convertFlowToActionLifeCycle(Flow flow) throws Exception {

Review Comment:
   I suggest add `@NonNull` if the params must not be null.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java:
##########
@@ -17,20 +17,64 @@
 
 package org.apache.seatunnel.engine.server.task;
 
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
 
 import java.net.URL;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
-public class SinkAggregatedCommitterTask extends CoordinatorTask {
+public class SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends CoordinatorTask {
 
+    private static final ILogger LOGGER = Logger.getLogger(SinkAggregatedCommitterTask.class);
     private static final long serialVersionUID = 5906594537520393503L;
-    private final SinkAction<?, ?, ?, ?> sink;
+    private final SinkAction<?, ?, ?, AggregatedCommitInfoT> sink;
+
+    private final SinkAggregatedCommitter<?, AggregatedCommitInfoT> aggregatedCommitter;
+
+    private final Map<Long, Address> writerAddressMap;
+
+    private final Map<Long, List<AggregatedCommitInfoT>> checkpointCommitInfoMap;
 
-    public SinkAggregatedCommitterTask(long taskID, SinkAction<?, ?, ?, ?> sink) {
-        super(taskID);
+    public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID, SinkAction<?, ?, ?, AggregatedCommitInfoT> sink,
+                                       SinkAggregatedCommitter<?, AggregatedCommitInfoT> aggregatedCommitter) {
+        super(jobID, taskID);
         this.sink = sink;
+        this.aggregatedCommitter = aggregatedCommitter;
+        this.writerAddressMap = new ConcurrentHashMap<>();
+        this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void init() throws Exception {
+        super.init();
+        LOGGER.info("starting seatunnel source split enumerator task, sink name: " + sink.getName());

Review Comment:
   `source split enumerator task` shouldn't be here. 



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java:
##########
@@ -44,7 +44,7 @@ public Address applyForResource(@NonNull Long jobId, @NonNull Long taskId) {
             }
 
             Address localhost =
-                jobAddressMap.putIfAbsent(taskId, new Address("localhost", 5801));
+                jobAddressMap.putIfAbsent(taskId, new Address("192.168.2.1", 5701));

Review Comment:
   `192.168.2.1` can not be use here.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java:
##########
@@ -42,8 +42,8 @@ public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
 
     @Override
     protected NonCompletableFuture<?> doRun() throws Exception {
-        TaskExecutionService taskExecutionService = getService();
-        NonCompletableFuture<TaskExecutionState> voidCompletableFuture = taskExecutionService.deployTask(taskImmutableInformation);
+        SeaTunnelServer server = getService();
+        NonCompletableFuture<TaskExecutionState> voidCompletableFuture =  server.getTaskExecutionService().deployTask(taskImmutableInformation);

Review Comment:
   ```suggestion
           return server.getTaskExecutionService().deployTask(taskImmutableInformation);
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
+import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import org.apache.seatunnel.engine.server.execution.ProgressState;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.task.flow.OneOutputFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+public class TransformSeaTunnelTask extends SeaTunnelTask {
+
+    private static final ILogger LOGGER = Logger.getLogger(TransformSeaTunnelTask.class);
+
+    public TransformSeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) {
+        super(jobID, taskID, indexID, executionFlow);
+    }
+
+    private Collector<Record> collector;
+
+    @Override
+    public void init() throws Exception {
+        super.init();
+        LOGGER.info("starting seatunnel transform task, index " + indexID);
+        collector = new SeaTunnelTransformCollector<>(outputs);
+    }
+
+    @Override
+    protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(SourceAction<?, ?, ?> sourceAction,
+                                                                  SourceConfig config) {
+        throw new UnsupportedOperationException("MiddleSeaTunnelTask can't create SourceFlowLifeCycle");
+    }
+
+    @NonNull
+    @Override
+    @SuppressWarnings("unchecked")
+    public ProgressState call() throws Exception {
+        if (startFlowLifeCycle instanceof OneOutputFlowLifeCycle) {

Review Comment:
   I suggest we check the really type before the task begin running and read data. We should try our best to avoid Exceptions when processing data. Especially when these exceptions can be checked before processing data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#issuecomment-1214769671

   +1, cc @CalvinKirs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r944513813


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java:
##########
@@ -17,20 +17,64 @@
 
 package org.apache.seatunnel.engine.server.task;
 
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
 
 import java.net.URL;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
-public class SinkAggregatedCommitterTask extends CoordinatorTask {
+public class SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends CoordinatorTask {
 
+    private static final ILogger LOGGER = Logger.getLogger(SinkAggregatedCommitterTask.class);
     private static final long serialVersionUID = 5906594537520393503L;
-    private final SinkAction<?, ?, ?, ?> sink;
+    private final SinkAction<?, ?, ?, AggregatedCommitInfoT> sink;
+
+    private final SinkAggregatedCommitter<?, AggregatedCommitInfoT> aggregatedCommitter;
+
+    private final Map<Long, Address> writerAddressMap;
+
+    private final Map<Long, List<AggregatedCommitInfoT>> checkpointCommitInfoMap;
 
-    public SinkAggregatedCommitterTask(long taskID, SinkAction<?, ?, ?, ?> sink) {
-        super(taskID);
+    public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID, SinkAction<?, ?, ?, AggregatedCommitInfoT> sink,
+                                       SinkAggregatedCommitter<?, AggregatedCommitInfoT> aggregatedCommitter) {
+        super(jobID, taskID);
         this.sink = sink;
+        this.aggregatedCommitter = aggregatedCommitter;
+        this.writerAddressMap = new ConcurrentHashMap<>();
+        this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void init() throws Exception {
+        super.init();
+        LOGGER.info("starting seatunnel source split enumerator task, sink name: " + sink.getName());

Review Comment:
   My fault, fixed it.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
+import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import org.apache.seatunnel.engine.server.execution.ProgressState;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.task.flow.OneOutputFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+public class TransformSeaTunnelTask extends SeaTunnelTask {
+
+    private static final ILogger LOGGER = Logger.getLogger(TransformSeaTunnelTask.class);
+
+    public TransformSeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) {
+        super(jobID, taskID, indexID, executionFlow);
+    }
+
+    private Collector<Record> collector;
+
+    @Override
+    public void init() throws Exception {
+        super.init();
+        LOGGER.info("starting seatunnel transform task, index " + indexID);
+        collector = new SeaTunnelTransformCollector<>(outputs);
+    }
+
+    @Override
+    protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(SourceAction<?, ?, ?> sourceAction,
+                                                                  SourceConfig config) {
+        throw new UnsupportedOperationException("MiddleSeaTunnelTask can't create SourceFlowLifeCycle");
+    }
+
+    @NonNull
+    @Override
+    @SuppressWarnings("unchecked")
+    public ProgressState call() throws Exception {
+        if (startFlowLifeCycle instanceof OneOutputFlowLifeCycle) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942017385


##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Record.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+/**
+ * Contain {@link SeaTunnelRow} or Checkpoint Barrier
+ */
+public class Record {

Review Comment:
   Does `Record` need implements `IdentifiedDataSerializable`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);

Review Comment:
   Will we consider supporting batch put



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void collect(Collector<T> collector) throws Exception {
+        while (true) {
+            T record = queue.poll();
+            if (record != null) {
+                collector.collect(record);
+            } else {
+                break;

Review Comment:
   This function will never return if there is always data in the queue. Is this what we expect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942072658


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);

Review Comment:
   > add to java queue in memory, batch can't change anything, row alway put in queue one by one.
   
   I see datax support batch put, so I have this question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942096624


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);

Review Comment:
   > > add to java queue in memory, batch can't change anything, row alway put in queue one by one.
   > 
   > I see datax support batch put, so I have this question.
   
   We can support batch put into (read from) hezalcast queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942059133


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void collect(Collector<T> collector) throws Exception {
+        while (true) {
+            T record = queue.poll();
+            if (record != null) {
+                collector.collect(record);
+            } else {
+                break;

Review Comment:
   Yes, the data will be handled to sink by `collector.collect(record)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942057238


##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Record.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+/**
+ * Contain {@link SeaTunnelRow} or Checkpoint Barrier
+ */
+public class Record {

Review Comment:
   I will add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r942074661


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.transform.Collector;
+
+import java.util.concurrent.BlockingQueue;
+
+public class IntermediateQueueFlowLifeCycle<T> implements OneInputFlowLifeCycle<T>, OneOutputFlowLifeCycle<T> {
+
+    private final BlockingQueue<T> queue;
+
+    public IntermediateQueueFlowLifeCycle(BlockingQueue<T> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void received(T row) {
+        try {
+            queue.put(row);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void collect(Collector<T> collector) throws Exception {
+        while (true) {
+            T record = queue.poll();
+            if (record != null) {
+                collector.collect(record);
+            } else {
+                break;

Review Comment:
   > Yes, the data will be handled to sink by `collector.collect(record)`
   
   Get it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r944514706


##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/PhysicalPlanTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag;
+
+import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import com.hazelcast.spi.impl.NodeEngine;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+
+public class PhysicalPlanTest {
+
+    @Test
+    public void testLogicalToPhysical() throws MalformedURLException {
+
+        HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+        NodeEngine nodeEngine = instance.node.nodeEngine;
+
+        IdGenerator idGenerator = new IdGenerator();
+
+        Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", new FakeSource(),
+                Collections.singletonList(new URL("file:///fake.jar")));

Review Comment:
   I just for test, not use the jar path at now.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java:
##########
@@ -30,48 +50,95 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 
-public class SeaTunnelTask extends AbstractTask {
+public abstract class SeaTunnelTask extends AbstractTask {
 
     private static final long serialVersionUID = 2604309561613784425L;
     private final Flow executionFlow;
 
-    // TODO init memberID in task execution service
-    private UUID memberID = UUID.randomUUID();
-    private int enumeratorTaskID = -1;
+    protected FlowLifeCycle startFlowLifeCycle;
 
-    public SeaTunnelTask(long taskID, Flow executionFlow) {
-        super(taskID);
-        // TODO add enumerator task ID
-        enumeratorTaskID = 1;
+    protected List<OneInputFlowLifeCycle<Record>> outputs;
+
+    protected int indexID;
+
+    private TaskGroup taskBelongGroup;
+
+    public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) {
+        super(jobID, taskID);
+        this.indexID = indexID;
         this.executionFlow = executionFlow;
     }
 
     @Override
-    public void init() {
+    public void init() throws Exception {
+        super.init();
+        startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
     }
 
-    @Override
-    public void close() throws IOException {
-        super.close();
+    public void setTaskGroup(TaskGroup group) {
+        this.taskBelongGroup = group;
     }
 
-    private void register() {
-        if (startFromSource()) {
-            this.executionContext.sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
+    @SuppressWarnings({"unchecked", "rawtypes", "checkstyle:MagicNumber"})
+    private FlowLifeCycle convertFlowToActionLifeCycle(Flow flow) throws Exception {

Review Comment:
   Done



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java:
##########
@@ -57,14 +73,24 @@ private static class Factory implements DataSerializableFactory {
         @Override
         public IdentifiedDataSerializable create(int typeId) {
             switch (typeId) {
-                case REGISTER_TYPE:
-                    return new RegisterOperation();
+                case SOURCE_REGISTER_TYPE:
+                    return new SourceRegisterOperation();
                 case REQUEST_SPLIT_TYPE:
                     return new RequestSplitOperation();
                 case ASSIGN_SPLIT_TYPE:
-                    return new AssignSplitOperation();
+                    return new AssignSplitOperation<>();
                 case TASK_GROUP_INFO_TYPE:
                     return new TaskGroupImmutableInformation();
+                case SOURCE_UNREGISTER_TYPE:
+                    return new SourceUnregisterOperation();
+                case SINK_REGISTER_TYPE:
+                    return new SinkRegisterOperation();
+                case SINK_UNREGISTER_TYPE:
+                    return new SinkUnregisterOperation();
+                case TASK_LOCATION_TYPE:
+                    return new TaskLocation();
+                case PROGRESS_TYPE:
+                    return new Progress();
                 default:
                     return null;

Review Comment:
   Done



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java:
##########
@@ -44,7 +44,7 @@ public Address applyForResource(@NonNull Long jobId, @NonNull Long taskId) {
             }
 
             Address localhost =
-                jobAddressMap.putIfAbsent(taskId, new Address("localhost", 5801));
+                jobAddressMap.putIfAbsent(taskId, new Address("192.168.2.1", 5701));

Review Comment:
   Sorry, I test it in local with my ip. Wrong commit. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r945056514


##########
seatunnel-engine/seatunnel-engine-server/pom.xml:
##########
@@ -51,6 +51,19 @@
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
         </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r944517336


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/FlowConfig.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical.config;
+
+import java.io.Serializable;
+
+/**
+ * This interface indicates that this class is the configuration information of Flow
+ */
+public interface FlowConfig extends Serializable {

Review Comment:
   Some Flows require corresponding config to function properly. For example, the source needs to record the enumerator ID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r944515018


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java:
##########
@@ -42,8 +42,8 @@ public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
 
     @Override
     protected NonCompletableFuture<?> doRun() throws Exception {
-        TaskExecutionService taskExecutionService = getService();
-        NonCompletableFuture<TaskExecutionState> voidCompletableFuture = taskExecutionService.deployTask(taskImmutableInformation);
+        SeaTunnelServer server = getService();
+        NonCompletableFuture<TaskExecutionState> voidCompletableFuture =  server.getTaskExecutionService().deployTask(taskImmutableInformation);
         return voidCompletableFuture;

Review Comment:
   Fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2386: [Engine][Task] Add task runtime logic

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2386:
URL: https://github.com/apache/incubator-seatunnel/pull/2386#discussion_r944581816


##########
seatunnel-engine/seatunnel-engine-server/pom.xml:
##########
@@ -51,6 +51,19 @@
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
         </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.seatunnel</groupId>
               <artifactId>connector-fake</artifactId>
               <version>${project.version}</version>
               <scope>test</scope>
           </dependency>
   ```
   
   add test scope



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org