You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/11 07:49:34 UTC

[incubator-seatunnel] branch st-engine updated: add TaskGroup interface (#2395)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 3d6482368 add TaskGroup interface (#2395)
3d6482368 is described below

commit 3d6482368a2e4fb88f472038ae2b53484a9959fb
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Thu Aug 11 15:49:29 2022 +0800

    add TaskGroup interface (#2395)
---
 .../engine/server/TaskExecutionService.java        |  9 +++++---
 .../server/dag/physical/PhysicalPlanGenerator.java | 10 ++++-----
 .../engine/server/dag/physical/PhysicalVertex.java |  6 +++---
 .../engine/server/execution/TaskGroup.java         | 16 +++++++-------
 .../{TaskGroup.java => TaskGroupContext.java}      | 12 ++---------
 .../{TaskGroup.java => TaskGroupDefaultImpl.java}  | 25 +++++++++++++++++++---
 .../server/task/operation/RegisterOperation.java   |  5 ++---
 .../engine/server/TaskExecutionServiceTest.java    | 16 +++++++-------
 8 files changed, 56 insertions(+), 43 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 6e25cc2d1..4d15b319a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
 
 import com.hazelcast.logging.ILogger;
@@ -68,7 +69,7 @@ public class TaskExecutionService {
     private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
     private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
     // key: TaskID
-    private final ConcurrentMap<Long, ConcurrentMap<Long, TaskExecutionContext>> executionContexts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -85,7 +86,7 @@ public class TaskExecutionService {
         executorService.shutdownNow();
     }
 
-    public ConcurrentMap<Long, TaskExecutionContext> getExecutionContext(long taskGroupId) {
+    public TaskGroupContext getExecutionContext(long taskGroupId) {
         return executionContexts.get(taskGroupId);
     }
 
@@ -115,6 +116,7 @@ public class TaskExecutionService {
         TaskGroup taskGroup,
         CompletableFuture<Void> cancellationFuture
     ) {
+        taskGroup.init();
         Collection<Task> tasks = taskGroup.getTasks();
         final TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup);
         try {
@@ -129,7 +131,8 @@ public class TaskExecutionService {
                     .collect(partitioningBy(Task::isThreadsShare));
             submitThreadShareTask(executionTracker, byCooperation.get(true));
             submitBlockingTask(executionTracker, byCooperation.get(false));
-            executionContexts.put(taskGroup.getId(), taskExecutionContextMap);
+            taskGroup.setTasksContext(taskExecutionContextMap);
+            executionContexts.put(taskGroup.getId(), new TaskGroupContext(taskGroup));
         } catch (Throwable t) {
             executionTracker.future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 68da4e796..056faa0d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -34,7 +34,7 @@ import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutio
 import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
@@ -158,7 +158,7 @@ public class PhysicalPlanGenerator {
                 return new PhysicalVertex(atomicInteger.incrementAndGet(),
                     executorService,
                     collect.size(),
-                    new TaskGroup("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
+                    new TaskGroupDefaultImpl("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
                     taskFuture,
                     flakeIdGenerator,
                     pipelineIndex,
@@ -185,7 +185,7 @@ public class PhysicalPlanGenerator {
                     t.add(new PhysicalVertex(i,
                         executorService,
                         flow.getAction().getParallelism(),
-                        new TaskGroup("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
+                        new TaskGroupDefaultImpl("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
                         taskFuture,
                         flakeIdGenerator,
                         pipelineIndex,
@@ -210,7 +210,7 @@ public class PhysicalPlanGenerator {
             return new PhysicalVertex(atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
-                new TaskGroup(s.getName(), Lists.newArrayList(t)),
+                new TaskGroupDefaultImpl(s.getName(), Lists.newArrayList(t)),
                 taskFuture,
                 flakeIdGenerator,
                 pipelineIndex,
@@ -246,7 +246,7 @@ public class PhysicalPlanGenerator {
                     t.add(new PhysicalVertex(i,
                         executorService,
                         flow.getAction().getParallelism(),
-                        new TaskGroup("SourceTask",
+                        new TaskGroupDefaultImpl("SourceTask",
                             taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
                         taskFuture,
                         flakeIdGenerator,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index fbc568a62..879226c82 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.logging.ILogger;
@@ -52,7 +52,7 @@ public class PhysicalVertex {
 
     private final int parallelism;
 
-    private final TaskGroup taskGroup;
+    private final TaskGroupDefaultImpl taskGroup;
 
     private final ExecutorService executorService;
 
@@ -79,7 +79,7 @@ public class PhysicalVertex {
     public PhysicalVertex(int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
                           int parallelism,
-                          @NonNull TaskGroup taskGroup,
+                          @NonNull TaskGroupDefaultImpl taskGroup,
                           @NonNull CompletableFuture<TaskExecutionState> taskFuture,
                           @NonNull FlakeIdGenerator flakeIdGenerator,
                           int pipelineIndex,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index 764a1cdf8..783b1ae36 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -17,17 +17,17 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import lombok.Data;
-
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Map;
+
+public interface TaskGroup extends Serializable {
+
+    public long getId();
 
-@Data
-public class TaskGroup implements Serializable {
-    //TODO iD is required. The construction method needs to be modified later
-    private long id;
+    public void init();
 
-    private final String taskGroupName;
+    public Collection<Task> getTasks();
 
-    private final Collection<Task> tasks;
+    public void setTasksContext(Map<Long, TaskExecutionContext> taskExecutionContextMap);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
similarity index 75%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
index 764a1cdf8..c5b7c74d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
@@ -19,15 +19,7 @@ package org.apache.seatunnel.engine.server.execution;
 
 import lombok.Data;
 
-import java.io.Serializable;
-import java.util.Collection;
-
 @Data
-public class TaskGroup implements Serializable {
-    //TODO iD is required. The construction method needs to be modified later
-    private long id;
-
-    private final String taskGroupName;
-
-    private final Collection<Task> tasks;
+public class TaskGroupContext {
+    final TaskGroup taskGroup;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
similarity index 72%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index 764a1cdf8..feff9249b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -19,15 +19,34 @@ package org.apache.seatunnel.engine.server.execution;
 
 import lombok.Data;
 
-import java.io.Serializable;
 import java.util.Collection;
+import java.util.Map;
 
 @Data
-public class TaskGroup implements Serializable {
-    //TODO iD is required. The construction method needs to be modified later
+public class TaskGroupDefaultImpl implements TaskGroup{
     private long id;
 
     private final String taskGroupName;
 
     private final Collection<Task> tasks;
+
+    @Override
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public Collection<Task> getTasks() {
+        return tasks;
+    }
+
+    @Override
+    public void setTasksContext(Map<Long, TaskExecutionContext> taskExecutionContextMap) {
+
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
index 334f0152e..f548018ba 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -28,7 +28,6 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * For {@link org.apache.seatunnel.api.source.SourceReader} to register with
@@ -51,7 +50,7 @@ public class RegisterOperation extends Operation implements IdentifiedDataSerial
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         UUID readerUUID = getCallerUuid();
-        ConcurrentMap<Long, TaskExecutionContext> executionContextMap = server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+        TaskGroupContext executionContext = server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
         // TODO register reader to enumerator
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 0bf524cb6..d86fc2080 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.config.Config;
@@ -38,6 +37,7 @@ import com.hazelcast.logging.ILogger;
 import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
 import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
 import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TestTask;
 import org.junit.Test;
 
@@ -92,7 +92,7 @@ public class TaskExecutionServiceTest {
 
         CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
 
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
 
         cancellationFuture.cancel(true);
 
@@ -113,7 +113,7 @@ public class TaskExecutionServiceTest {
 
         CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
 
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
         completableFuture.whenComplete(new BiConsumer<TaskExecutionState, Throwable>() {
             @Override
             public void accept(TaskExecutionState unused, Throwable throwable) {
@@ -148,7 +148,7 @@ public class TaskExecutionServiceTest {
 
         TaskExecutionService taskExecutionService = service.getTaskExecutionService();
 
-        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(criticalTask)), new CompletableFuture<Void>());
+        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("t1", Lists.newArrayList(criticalTask)), new CompletableFuture<Void>());
 
         // Run it for a while
         Thread.sleep(taskRunTime);
@@ -195,12 +195,12 @@ public class TaskExecutionServiceTest {
         Collections.shuffle(tasks);
 
 
-        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(tasks)), new CompletableFuture<Void>());
+        CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(tasks)), new CompletableFuture<Void>());
 
 
-        CompletableFuture<TaskExecutionState> t1c = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(t1)), new CompletableFuture<Void>());
+        CompletableFuture<TaskExecutionState> t1c = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("t1", Lists.newArrayList(t1)), new CompletableFuture<Void>());
 
-        CompletableFuture<TaskExecutionState> t2c = taskExecutionService.submitTaskGroup(new TaskGroup("t2", Lists.newArrayList(t2)), new CompletableFuture<Void>());
+        CompletableFuture<TaskExecutionState> t2c = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("t2", Lists.newArrayList(t2)), new CompletableFuture<Void>());
 
         Thread.sleep(taskRunTime);
 
@@ -240,7 +240,7 @@ public class TaskExecutionServiceTest {
         tasks.addAll(lowLagTask);
         Collections.shuffle(tasks);
 
-        TaskGroup taskGroup = new TaskGroup("ts", Lists.newArrayList(tasks));
+        TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl("ts", Lists.newArrayList(tasks));
 
 
         logger.info("task size is : " + taskGroup.getTasks().size());