You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/11 07:49:34 UTC
[incubator-seatunnel] branch st-engine updated: add TaskGroup interface (#2395)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 3d6482368 add TaskGroup interface (#2395)
3d6482368 is described below
commit 3d6482368a2e4fb88f472038ae2b53484a9959fb
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Thu Aug 11 15:49:29 2022 +0800
add TaskGroup interface (#2395)
---
.../engine/server/TaskExecutionService.java | 9 +++++---
.../server/dag/physical/PhysicalPlanGenerator.java | 10 ++++-----
.../engine/server/dag/physical/PhysicalVertex.java | 6 +++---
.../engine/server/execution/TaskGroup.java | 16 +++++++-------
.../{TaskGroup.java => TaskGroupContext.java} | 12 ++---------
.../{TaskGroup.java => TaskGroupDefaultImpl.java} | 25 +++++++++++++++++++---
.../server/task/operation/RegisterOperation.java | 5 ++---
.../engine/server/TaskExecutionServiceTest.java | 16 +++++++-------
8 files changed, 56 insertions(+), 43 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 6e25cc2d1..4d15b319a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import com.hazelcast.logging.ILogger;
@@ -68,7 +69,7 @@ public class TaskExecutionService {
private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
// key: TaskID
- private final ConcurrentMap<Long, ConcurrentMap<Long, TaskExecutionContext>> executionContexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -85,7 +86,7 @@ public class TaskExecutionService {
executorService.shutdownNow();
}
- public ConcurrentMap<Long, TaskExecutionContext> getExecutionContext(long taskGroupId) {
+ public TaskGroupContext getExecutionContext(long taskGroupId) {
return executionContexts.get(taskGroupId);
}
@@ -115,6 +116,7 @@ public class TaskExecutionService {
TaskGroup taskGroup,
CompletableFuture<Void> cancellationFuture
) {
+ taskGroup.init();
Collection<Task> tasks = taskGroup.getTasks();
final TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup);
try {
@@ -129,7 +131,8 @@ public class TaskExecutionService {
.collect(partitioningBy(Task::isThreadsShare));
submitThreadShareTask(executionTracker, byCooperation.get(true));
submitBlockingTask(executionTracker, byCooperation.get(false));
- executionContexts.put(taskGroup.getId(), taskExecutionContextMap);
+ taskGroup.setTasksContext(taskExecutionContextMap);
+ executionContexts.put(taskGroup.getId(), new TaskGroupContext(taskGroup));
} catch (Throwable t) {
executionTracker.future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 68da4e796..056faa0d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -34,7 +34,7 @@ import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutio
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
@@ -158,7 +158,7 @@ public class PhysicalPlanGenerator {
return new PhysicalVertex(atomicInteger.incrementAndGet(),
executorService,
collect.size(),
- new TaskGroup("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
+ new TaskGroupDefaultImpl("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
taskFuture,
flakeIdGenerator,
pipelineIndex,
@@ -185,7 +185,7 @@ public class PhysicalPlanGenerator {
t.add(new PhysicalVertex(i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroup("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
+ new TaskGroupDefaultImpl("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
taskFuture,
flakeIdGenerator,
pipelineIndex,
@@ -210,7 +210,7 @@ public class PhysicalPlanGenerator {
return new PhysicalVertex(atomicInteger.incrementAndGet(),
executorService,
sources.size(),
- new TaskGroup(s.getName(), Lists.newArrayList(t)),
+ new TaskGroupDefaultImpl(s.getName(), Lists.newArrayList(t)),
taskFuture,
flakeIdGenerator,
pipelineIndex,
@@ -246,7 +246,7 @@ public class PhysicalPlanGenerator {
t.add(new PhysicalVertex(i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroup("SourceTask",
+ new TaskGroupDefaultImpl("SourceTask",
taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
taskFuture,
flakeIdGenerator,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index fbc568a62..879226c82 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
@@ -52,7 +52,7 @@ public class PhysicalVertex {
private final int parallelism;
- private final TaskGroup taskGroup;
+ private final TaskGroupDefaultImpl taskGroup;
private final ExecutorService executorService;
@@ -79,7 +79,7 @@ public class PhysicalVertex {
public PhysicalVertex(int subTaskGroupIndex,
@NonNull ExecutorService executorService,
int parallelism,
- @NonNull TaskGroup taskGroup,
+ @NonNull TaskGroupDefaultImpl taskGroup,
@NonNull CompletableFuture<TaskExecutionState> taskFuture,
@NonNull FlakeIdGenerator flakeIdGenerator,
int pipelineIndex,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index 764a1cdf8..783b1ae36 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -17,17 +17,17 @@
package org.apache.seatunnel.engine.server.execution;
-import lombok.Data;
-
import java.io.Serializable;
import java.util.Collection;
+import java.util.Map;
+
+public interface TaskGroup extends Serializable {
+
+ public long getId();
-@Data
-public class TaskGroup implements Serializable {
- //TODO iD is required. The construction method needs to be modified later
- private long id;
+ public void init();
- private final String taskGroupName;
+ public Collection<Task> getTasks();
- private final Collection<Task> tasks;
+ public void setTasksContext(Map<Long, TaskExecutionContext> taskExecutionContextMap);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
similarity index 75%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
index 764a1cdf8..c5b7c74d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
@@ -19,15 +19,7 @@ package org.apache.seatunnel.engine.server.execution;
import lombok.Data;
-import java.io.Serializable;
-import java.util.Collection;
-
@Data
-public class TaskGroup implements Serializable {
- //TODO iD is required. The construction method needs to be modified later
- private long id;
-
- private final String taskGroupName;
-
- private final Collection<Task> tasks;
+public class TaskGroupContext {
+ final TaskGroup taskGroup;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
similarity index 72%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index 764a1cdf8..feff9249b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -19,15 +19,34 @@ package org.apache.seatunnel.engine.server.execution;
import lombok.Data;
-import java.io.Serializable;
import java.util.Collection;
+import java.util.Map;
@Data
-public class TaskGroup implements Serializable {
- //TODO iD is required. The construction method needs to be modified later
+public class TaskGroupDefaultImpl implements TaskGroup{
private long id;
private final String taskGroupName;
private final Collection<Task> tasks;
+
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public Collection<Task> getTasks() {
+ return tasks;
+ }
+
+ @Override
+ public void setTasksContext(Map<Long, TaskExecutionContext> taskExecutionContextMap) {
+
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
index 334f0152e..f548018ba 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -28,7 +28,6 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
/**
* For {@link org.apache.seatunnel.api.source.SourceReader} to register with
@@ -51,7 +50,7 @@ public class RegisterOperation extends Operation implements IdentifiedDataSerial
public void run() throws Exception {
SeaTunnelServer server = getService();
UUID readerUUID = getCallerUuid();
- ConcurrentMap<Long, TaskExecutionContext> executionContextMap = server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+ TaskGroupContext executionContext = server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
// TODO register reader to enumerator
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 0bf524cb6..d86fc2080 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
import com.google.common.collect.Lists;
import com.hazelcast.config.Config;
@@ -38,6 +37,7 @@ import com.hazelcast.logging.ILogger;
import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TestTask;
import org.junit.Test;
@@ -92,7 +92,7 @@ public class TaskExecutionServiceTest {
CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
- CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+ CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
cancellationFuture.cancel(true);
@@ -113,7 +113,7 @@ public class TaskExecutionServiceTest {
CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
- CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+ CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
completableFuture.whenComplete(new BiConsumer<TaskExecutionState, Throwable>() {
@Override
public void accept(TaskExecutionState unused, Throwable throwable) {
@@ -148,7 +148,7 @@ public class TaskExecutionServiceTest {
TaskExecutionService taskExecutionService = service.getTaskExecutionService();
- CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(criticalTask)), new CompletableFuture<Void>());
+ CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("t1", Lists.newArrayList(criticalTask)), new CompletableFuture<Void>());
// Run it for a while
Thread.sleep(taskRunTime);
@@ -195,12 +195,12 @@ public class TaskExecutionServiceTest {
Collections.shuffle(tasks);
- CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(tasks)), new CompletableFuture<Void>());
+ CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(tasks)), new CompletableFuture<Void>());
- CompletableFuture<TaskExecutionState> t1c = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(t1)), new CompletableFuture<Void>());
+ CompletableFuture<TaskExecutionState> t1c = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("t1", Lists.newArrayList(t1)), new CompletableFuture<Void>());
- CompletableFuture<TaskExecutionState> t2c = taskExecutionService.submitTaskGroup(new TaskGroup("t2", Lists.newArrayList(t2)), new CompletableFuture<Void>());
+ CompletableFuture<TaskExecutionState> t2c = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("t2", Lists.newArrayList(t2)), new CompletableFuture<Void>());
Thread.sleep(taskRunTime);
@@ -240,7 +240,7 @@ public class TaskExecutionServiceTest {
tasks.addAll(lowLagTask);
Collections.shuffle(tasks);
- TaskGroup taskGroup = new TaskGroup("ts", Lists.newArrayList(tasks));
+ TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl("ts", Lists.newArrayList(tasks));
logger.info("task size is : " + taskGroup.getTasks().size());