You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/10 09:53:57 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine][TaskExecutionService]Add dynamic thread sharing optimization (#2366)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 37d9e0dfb [ST-Engine][TaskExecutionService]Add dynamic thread sharing optimization (#2366)
37d9e0dfb is described below
commit 37d9e0dfbbcf17835fc49c6a4ae765a9a6cad4e8
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Aug 10 17:53:51 2022 +0800
[ST-Engine][TaskExecutionService]Add dynamic thread sharing optimization (#2366)
* Add dynamic thread sharing optimization (#2279)
---
pom.xml | 7 +
seatunnel-engine/seatunnel-engine-common/pom.xml | 2 +-
.../engine/common/utils/NonCompletableFuture.java | 2 +-
seatunnel-engine/seatunnel-engine-server/pom.xml | 4 +
.../seatunnel/engine/server/SeaTunnelServer.java | 3 +-
.../engine/server/TaskExecutionService.java | 290 ++++++++++++++++-----
.../seatunnel/engine/server/execution/Task.java | 19 +-
.../engine/server/execution/TaskCallTimer.java | 146 +++++++++++
.../server/execution/TaskExecutionContext.java | 38 ++-
.../engine/server/execution/TaskGroup.java | 3 +
.../execution/{TaskGroup.java => TaskTracker.java} | 22 +-
.../seatunnel/engine/server/task/AbstractTask.java | 8 +-
.../task/SeaTunnelSplitEnumeratorContext.java | 2 +-
.../engine/server/task/SeaTunnelTask.java | 4 +-
.../server/task/SourceSplitEnumeratorTask.java | 15 +-
.../server/task/operation/RegisterOperation.java | 4 +-
.../test/execution/TaskExecutionServiceTest.java | 91 -------
.../engine/server/TaskExecutionServiceTest.java | 290 +++++++++++++++++++++
.../server/execution/ExceptionTestTask.java} | 33 ++-
.../server/execution/FixedCallTestTimeTask.java} | 55 ++--
.../engine/server/execution/StopTimeTestTask.java} | 50 ++--
.../engine/server}/execution/TestTask.java | 22 +-
22 files changed, 828 insertions(+), 282 deletions(-)
diff --git a/pom.xml b/pom.xml
index ab1634dec..0db72fd5c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,7 @@
<guava.version>19.0</guava.version>
<auto-service.version>1.0.1</auto-service.version>
<powermock.version>2.0.9</powermock.version>
+ <awaitility.version>4.1.1</awaitility.version>
<hadoop2.version>2.6.5</hadoop2.version>
<hadoop3.version>3.0.0</hadoop3.version>
<seatunnel.shade.package>org.apache.seatunnel.shade</seatunnel.shade.package>
@@ -689,6 +690,12 @@
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
index afad57006..118479711 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
- <version>${project.version}</version>
+ <version>${version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
index a3c6d2380..5bedc6900 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
@@ -62,7 +62,7 @@ public class NonCompletableFuture<T> extends CompletableFuture<T> {
throw new UnsupportedOperationException("This future can't be completed by an outside caller");
}
- private void internalComplete(T value) {
+ public void internalComplete(T value) {
super.complete(value);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
index b193e438a..2df759f0e 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -47,6 +47,10 @@
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 24a36c355..4159346e4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -70,6 +70,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
taskExecutionService = new TaskExecutionService(
nodeEngine, nodeEngine.getProperties()
);
+ taskExecutionService.start();
}
@Override
@@ -79,7 +80,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void shutdown(boolean terminate) {
-
+ taskExecutionService.shutdown();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index fb5eed305..6e25cc2d1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -18,29 +18,42 @@
package org.apache.seatunnel.engine.server;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static com.hazelcast.jet.impl.util.Util.uncheckRun;
+import static java.util.Collections.emptyList;
import static java.util.concurrent.Executors.newCachedThreadPool;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toList;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.logging.ILogger;
-import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.properties.HazelcastProperties;
import lombok.NonNull;
+import lombok.SneakyThrows;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* This class is responsible for the execution of the Task
@@ -48,12 +61,14 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TaskExecutionService {
private final String hzInstanceName;
- private final NodeEngine nodeEngine;
+ private final NodeEngineImpl nodeEngine;
private final ILogger logger;
private volatile boolean isShutdown;
- private final ExecutorService blockingTaskletExecutor = newCachedThreadPool(new BlockingTaskThreadFactory());
+ private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new LinkedBlockingDeque<>();
+ private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
+ private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
// key: TaskID
- private final ConcurrentMap<Long, TaskExecutionContext> executionContexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, ConcurrentMap<Long, TaskExecutionContext>> executionContexts = new ConcurrentHashMap<>();
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -61,91 +76,91 @@ public class TaskExecutionService {
this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
}
+ public void start() {
+ runBusWorkSupplier.runNewBusWork(false);
+ }
+
public void shutdown() {
isShutdown = true;
- blockingTaskletExecutor.shutdownNow();
+ executorService.shutdownNow();
}
- public TaskExecutionContext getExecutionContext(long taskId) {
- return executionContexts.get(taskId);
+ public ConcurrentMap<Long, TaskExecutionContext> getExecutionContext(long taskGroupId) {
+ return executionContexts.get(taskGroupId);
}
- /**
- * Submit a TaskGroup and run the Task in it
- */
- public Map<Long, TaskExecutionContext> submitTask(
- TaskGroup taskGroup
- ) {
- Map<Long, TaskExecutionContext> contextMap = new HashMap<>(taskGroup.getTasks().size());
- taskGroup.getTasks().forEach(task -> {
- contextMap.put(task.getTaskID(), submitTask(task));
- });
- return contextMap;
+ private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
+ tasks.stream()
+ .map(t -> new TaskTracker(t, taskGroupExecutionTracker))
+ .forEach(threadShareTaskQueue::add);
}
- public TaskExecutionContext submitTask(Task task) {
- CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
- TaskletTracker taskletTracker = new TaskletTracker(task, cancellationFuture);
- taskletTracker.taskletFutures =
- blockingTaskletExecutor.submit(new BlockingWorker(taskletTracker));
+ private void submitBlockingTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext(
- taskletTracker.future,
- cancellationFuture,
- this
- );
-
- executionContexts.put(task.getTaskID(), taskExecutionContext);
- return taskExecutionContext;
+ CountDownLatch startedLatch = new CountDownLatch(tasks.size());
+ taskGroupExecutionTracker.blockingFutures = tasks
+ .stream()
+ .map(t -> new BlockingWorker(new TaskTracker(t, taskGroupExecutionTracker), startedLatch))
+ .map(executorService::submit)
+ .collect(toList());
+ // Do not return from this method until all workers have started. Otherwise
+ // on cancellation there is a race where the executor might not have started
+ // the worker yet. This would result in taskletDone() never being called for
+ // a worker.
+ uncheckRun(startedLatch::await);
}
- private final class TaskletTracker {
- final NonCompletableFuture future = new NonCompletableFuture();
- final Task task;
- volatile Future<?> taskletFutures;
-
- TaskletTracker(Task task, CompletableFuture<Void> cancellationFuture) {
- this.task = task;
-
- cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
- if (e == null) {
- e = new IllegalStateException("cancellationFuture should be completed exceptionally");
- }
- future.internalCompleteExceptionally(e);
- taskletFutures.cancel(true);
- }));
- }
-
- @Override
- public String toString() {
- return "Tracking " + task;
+ public CompletableFuture<TaskExecutionState> submitTaskGroup(
+ TaskGroup taskGroup,
+ CompletableFuture<Void> cancellationFuture
+ ) {
+ Collection<Task> tasks = taskGroup.getTasks();
+ final TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup);
+ try {
+ ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
+ final Map<Boolean, List<Task>> byCooperation =
+ tasks.stream()
+ .peek(x -> {
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext(x, nodeEngine);
+ x.setTaskExecutionContext(taskExecutionContext);
+ taskExecutionContextMap.put(x.getTaskID(), taskExecutionContext);
+ })
+ .collect(partitioningBy(Task::isThreadsShare));
+ submitThreadShareTask(executionTracker, byCooperation.get(true));
+ submitBlockingTask(executionTracker, byCooperation.get(false));
+ executionContexts.put(taskGroup.getId(), taskExecutionContextMap);
+ } catch (Throwable t) {
+ executionTracker.future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
}
+ return new NonCompletableFuture<>(executionTracker.future);
}
private final class BlockingWorker implements Runnable {
- private final TaskletTracker tracker;
+ private final TaskTracker tracker;
+ private final CountDownLatch startedLatch;
- private BlockingWorker(TaskletTracker tracker) {
+ private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {
this.tracker = tracker;
+ this.startedLatch = startedLatch;
}
@Override
public void run() {
final Task t = tracker.task;
try {
+ startedLatch.countDown();
t.init();
ProgressState result;
do {
result = t.call();
- } while (!result.isDone() && !isShutdown && !tracker.taskletFutures.isCancelled());
-
+ } while (!result.isDone() && !isShutdown && !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
- tracker.future.internalCompleteExceptionally(e);
+ tracker.taskGroupExecutionTracker.exception(e);
} finally {
- tracker.future.internalComplete();
+ tracker.taskGroupExecutionTracker.taskDone();
}
}
}
@@ -156,7 +171,162 @@ public class TaskExecutionService {
@Override
public Thread newThread(@NonNull Runnable r) {
return new Thread(r,
- String.format("hz.%s.seaTunnel.blocking.thread-%d", hzInstanceName, seq.getAndIncrement()));
+ String.format("hz.%s.seaTunnel.task.thread-%d", hzInstanceName, seq.getAndIncrement()));
+ }
+ }
+
+ /**
+ * CooperativeTaskWorker is used to poll the task call method,
+ * When a task times out, a new BusWork will be created to take over the execution of the task
+ */
+ public final class CooperativeTaskWorker implements Runnable {
+
+ AtomicBoolean keep = new AtomicBoolean(true);
+ public AtomicReference<TaskTracker> exclusiveTaskTracker = new AtomicReference<>();
+ final TaskCallTimer timer;
+ public LinkedBlockingDeque<TaskTracker> taskqueue;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskqueue, RunBusWorkSupplier runBusWorkSupplier) {
+ logger.info(String.format("Created new BusWork : %s", this.hashCode()));
+ this.taskqueue = taskqueue;
+ this.timer = new TaskCallTimer(50, keep, runBusWorkSupplier, this);
+ }
+
+ @SneakyThrows
+ @Override
+ public void run() {
+ while (keep.get()) {
+ TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
+ exclusiveTaskTracker.get() :
+ taskqueue.takeFirst();
+ TaskGroupExecutionTracker taskGroupExecutionTracker = taskTracker.taskGroupExecutionTracker;
+ if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
+ taskGroupExecutionTracker.taskDone();
+ if (null != exclusiveTaskTracker.get()) {
+ // If it's exclusive need to end the work
+ break;
+ } else {
+ // No action required and don't put back
+ continue;
+ }
+ }
+ //start timer, if it's exclusive, don't need to start
+ if (null == exclusiveTaskTracker.get()) {
+ timer.timerStart(taskTracker);
+ }
+ ProgressState call = null;
+ try {
+ //run task
+ call = taskTracker.task.call();
+ synchronized (timer) {
+ timer.timerStop();
+ }
+ } catch (Throwable e) {
+ //task Failure and complete
+ taskGroupExecutionTracker.exception(e);
+ taskGroupExecutionTracker.taskDone();
+ //If it's exclusive need to end the work
+ logger.warning("Exception in " + taskTracker.task, e);
+ if (null != exclusiveTaskTracker.get()) {
+ break;
+ }
+ } finally {
+ //stop timer
+ timer.timerStop();
+ }
+ //task call finished
+ if (null != call) {
+ if (call.isDone()) {
+ //If it's exclusive, you need to end the work
+ taskGroupExecutionTracker.taskDone();
+ if (null != exclusiveTaskTracker.get()) {
+ break;
+ }
+ } else {
+ //Task is not completed. Put task to the end of the queue
+ //If the current work has an exclusive tracker, it will not be put back
+ if (null == exclusiveTaskTracker.get()) {
+ taskqueue.offer(taskTracker);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Used to create a new BusWork and run
+ */
+ public final class RunBusWorkSupplier {
+
+ ExecutorService executorService;
+ LinkedBlockingDeque<TaskTracker> taskQueue;
+
+ public RunBusWorkSupplier(ExecutorService executorService, LinkedBlockingDeque<TaskTracker> taskqueue) {
+ this.executorService = executorService;
+ this.taskQueue = taskqueue;
+ }
+
+ public boolean runNewBusWork(boolean checkTaskQueue) {
+ if (!checkTaskQueue || taskQueue.size() > 0) {
+ executorService.submit(new CooperativeTaskWorker(taskQueue, this));
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Internal utility class to track the overall state of tasklet execution.
+ * There's one instance of this class per job.
+ */
+ public final class TaskGroupExecutionTracker {
+
+ private final TaskGroup taskGroup;
+ final CompletableFuture<TaskExecutionState> future = new CompletableFuture<>();
+ volatile List<Future<?>> blockingFutures = emptyList();
+
+ private final AtomicInteger completionLatch;
+ private final AtomicReference<Throwable> executionException = new AtomicReference<>();
+
+ private final AtomicBoolean isCancel = new AtomicBoolean(false);
+
+ TaskGroupExecutionTracker(CompletableFuture<Void> cancellationFuture, TaskGroup taskGroup) {
+ this.completionLatch = new AtomicInteger(taskGroup.getTasks().size());
+ this.taskGroup = taskGroup;
+ cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
+ isCancel.set(true);
+ if (e == null) {
+ e = new IllegalStateException("cancellationFuture should be completed exceptionally");
+ }
+ exception(e);
+ // Don't interrupt the threads. We require that they do not block for too long,
+ // interrupting them might make the termination faster, but can also cause troubles.
+ blockingFutures.forEach(f -> f.cancel(false));
+ }));
+ }
+
+ void exception(Throwable t) {
+ executionException.compareAndSet(null, t);
+ }
+
+ void taskDone() {
+ if (completionLatch.decrementAndGet() == 0) {
+ executionContexts.remove(taskGroup.getId());
+ Throwable ex = executionException.get();
+ if (ex == null) {
+ future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FINISHED, null));
+ } else if (isCancel.get()) {
+ future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, ex));
+ } else {
+ future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, ex));
+ }
+ }
+ }
+
+ boolean executionCompletedExceptionally() {
+ return executionException.get() != null;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 9025facce..a35d63223 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -17,14 +17,10 @@
package org.apache.seatunnel.engine.server.execution;
-import com.hazelcast.spi.impl.operationservice.Operation;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
import java.io.IOException;
import java.io.Serializable;
-import java.util.UUID;
public interface Task extends Serializable {
@@ -37,24 +33,13 @@ public interface Task extends Serializable {
@NonNull
Long getTaskID();
- default boolean isCooperative() {
+ default boolean isThreadsShare() {
return false;
}
default void close() throws IOException {
}
- default void setOperationService(OperationService operationService) {
+ default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext){
}
-
- default <E> InvocationFuture<E> sendToMaster(Operation operation) {
- // TODO add method send operation to master
- return null;
- }
-
- default <E> InvocationFuture<E> sendToMember(Operation operation, UUID memberID) {
- // TODO add method send operation to member
- return null;
- }
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskCallTimer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskCallTimer.java
new file mode 100644
index 000000000..d4514babf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskCallTimer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TaskCallTimer is a time-consuming timer for Task Call method execution
+ */
+public class TaskCallTimer extends Thread {
+
+ long nextExecutionTime;
+ long delay;
+
+ TaskExecutionService.CooperativeTaskWorker cooperativeTaskWorker;
+ AtomicBoolean keep;
+ TaskExecutionService.RunBusWorkSupplier runBusWorkSupplier;
+
+ TaskTracker taskTracker;
+
+ private final Object lock = new Object();
+ boolean started = false;
+ AtomicBoolean wait0 = new AtomicBoolean(false);
+
+ public TaskCallTimer(
+ long delay,
+ AtomicBoolean keep,
+ TaskExecutionService.RunBusWorkSupplier runBusWorkSupplier,
+ TaskExecutionService.CooperativeTaskWorker cooperativeTaskWorker) {
+ this.delay = delay;
+ this.keep = keep;
+ this.runBusWorkSupplier = runBusWorkSupplier;
+ this.cooperativeTaskWorker = cooperativeTaskWorker;
+ }
+
+ private void startTimer() {
+ nextExecutionTime = System.currentTimeMillis() + delay;
+ this.start();
+ }
+
+ public void reSet(long tmpDelay) {
+ nextExecutionTime = System.currentTimeMillis() + tmpDelay;
+ if (started) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ } else {
+ started = true;
+ this.start();
+ }
+ }
+
+ public void reSet() {
+ nextExecutionTime = System.currentTimeMillis() + delay;
+ if (!started) {
+ started = true;
+ this.start();
+ }
+
+ }
+
+ public void timerStart(TaskTracker taskTracker) {
+ wait0.set(false);
+ this.taskTracker = taskTracker;
+ nextExecutionTime = System.currentTimeMillis() + delay;
+ if (started) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ } else {
+ started = true;
+ this.start();
+ }
+ }
+
+ public void timerStop() {
+ wait0.set(true);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ long currentTime;
+ long executionTime;
+ boolean wait;
+ try {
+ synchronized (this) {
+ wait = wait0.get();
+ currentTime = System.currentTimeMillis();
+ executionTime = this.nextExecutionTime;
+ if (!wait && executionTime <= currentTime) {
+ timeoutAct(this.taskTracker.expiredTimes.incrementAndGet());
+ break;
+ }
+ }
+ if (wait) {
+ synchronized (lock) {
+ lock.wait();
+ }
+ } else {
+ synchronized (lock) {
+ lock.wait(executionTime - currentTime);
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * The action to be performed when the task call method execution times out
+ */
+ private void timeoutAct(int expiredTimes) {
+ if (expiredTimes >= 1) {
+ // 1 busWork keep on running
+ keep.set(true);
+ // 2 busWork exclusive to the current taskTracker
+ cooperativeTaskWorker.exclusiveTaskTracker.set(taskTracker);
+ // 3 Submit a new BusWork to execute other tasks
+ runBusWorkSupplier.runNewBusWork(false);
+ } else {
+ // 1 Stop the current busWork from continuing to execute the new Task
+ keep.set(false);
+ // 2 Submit a new BusWork to execute other tasks
+ runBusWorkSupplier.runNewBusWork(false);
+ }
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 3cd8c6734..baccf0e34 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -17,33 +17,31 @@
package org.apache.seatunnel.engine.server.execution;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import java.util.concurrent.CompletableFuture;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
public class TaskExecutionContext {
- // future which is Task submit
- public volatile CompletableFuture<Void> executionFuture;
+ private final Task task;
+ private final NodeEngineImpl nodeEngine;
- // future which can only be used to cancel the local execution.
- private volatile CompletableFuture<Void> cancellationFuture;
-
- private TaskExecutionService taskExecutionService;
-
- public TaskExecutionContext(
- CompletableFuture<Void> executionFuture,
- CompletableFuture<Void> cancellationFuture,
- TaskExecutionService taskExecutionService
- ) {
- this.executionFuture = executionFuture;
- this.cancellationFuture = cancellationFuture;
- this.taskExecutionService = taskExecutionService;
+ public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
+ this.task = task;
+ this.nodeEngine = nodeEngine;
}
- public CompletableFuture<Void> cancel() {
- cancellationFuture.cancel(true);
- return executionFuture;
+ public <E> InvocationFuture<E> sendToMaster(Operation operation) {
+ InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
+ return invocationBuilder.invoke();
}
+ public <E> InvocationFuture<E> sendToMember(Operation operation, Address memberID) {
+ InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, memberID);
+ return invocationBuilder.invoke();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index c0123277c..764a1cdf8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -24,6 +24,9 @@ import java.util.Collection;
@Data
public class TaskGroup implements Serializable {
+ //TODO iD is required. The construction method needs to be modified later
+ private long id;
+
private final String taskGroupName;
private final Collection<Task> tasks;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskTracker.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskTracker.java
index c0123277c..2ab25d3b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskTracker.java
@@ -17,14 +17,22 @@
package org.apache.seatunnel.engine.server.execution;
-import lombok.Data;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
-import java.io.Serializable;
-import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
-@Data
-public class TaskGroup implements Serializable {
- private final String taskGroupName;
+public class TaskTracker {
+ public final AtomicInteger expiredTimes = new AtomicInteger(0);
+ public final TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker;
+ public final Task task;
- private final Collection<Task> tasks;
+ public TaskTracker(Task task, TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker) {
+ this.task = task;
+ this.taskGroupExecutionTracker = taskGroupExecutionTracker;
+ }
+
+ @Override
+ public String toString() {
+ return "Tracking " + task;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index e15689c86..49ae36ceb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
-import com.hazelcast.spi.impl.operationservice.OperationService;
import lombok.NonNull;
import java.net.URL;
@@ -29,7 +29,7 @@ import java.util.Set;
public abstract class AbstractTask implements Task {
private static final long serialVersionUID = -2524701323779523718L;
- protected OperationService operationService;
+ protected TaskExecutionContext executionContext;
protected long taskID;
protected Progress progress;
@@ -42,8 +42,8 @@ public abstract class AbstractTask implements Task {
public abstract Set<URL> getJarsUrl();
@Override
- public void setOperationService(OperationService operationService) {
- this.operationService = operationService;
+ public void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
+ this.executionContext = taskExecutionContext;
}
@NonNull
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
index 319bc4ec6..ab206b0b7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
@@ -48,7 +48,7 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> impleme
@Override
public void assignSplit(int subtaskId, List<SplitT> splits) {
- task.sendToMember(new AssignSplitOperation<>(subtaskId, splits), task.getTaskMemberID(subtaskId));
+ task.executionContext.sendToMember(new AssignSplitOperation<>(subtaskId, splits), task.getTaskMemberAddr(subtaskId));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b5c381999..44c48aa26 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -59,12 +59,12 @@ public class SeaTunnelTask extends AbstractTask {
private void register() {
if (startFromSource()) {
- sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
+ this.executionContext.sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
}
}
private void requestSplit() {
- sendToMaster(new RequestSplitOperation(taskID, enumeratorTaskID));
+ this.executionContext.sendToMaster(new RequestSplitOperation(taskID, enumeratorTaskID));
}
private boolean startFromSource() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 4605d52f9..8fd425b31 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -21,13 +21,14 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+import com.hazelcast.cluster.Address;
+
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends CoordinatorTask {
@@ -36,7 +37,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private final PhysicalSourceAction<?, SplitT, ?> source;
private SourceSplitEnumerator<?, ?> enumerator;
private SeaTunnelSplitEnumeratorContext<SplitT> context;
- private Map<Integer, UUID> taskMemberMapping;
+ private Map<Integer, Address> taskMemberMapping;
@Override
public void init() throws Exception {
@@ -58,8 +59,8 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
this.source = source;
}
- private void receivedReader(int readerId, UUID memberID) {
- this.addTaskMemberMapping(readerId, memberID);
+ private void receivedReader(int readerId, Address memberAddr) {
+ this.addTaskMemberMapping(readerId, memberAddr);
enumerator.registerReader(readerId);
}
@@ -67,11 +68,11 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
enumerator.handleSplitRequest(taskID);
}
- private void addTaskMemberMapping(int taskID, UUID memberID) {
- taskMemberMapping.put(taskID, memberID);
+ private void addTaskMemberMapping(int taskID, Address memberAddr) {
+ taskMemberMapping.put(taskID, memberAddr);
}
- public UUID getTaskMemberID(int taskID) {
+ public Address getTaskMemberAddr(int taskID) {
return taskMemberMapping.get(taskID);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
index 5c26c3a39..334f0152e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
@@ -28,6 +28,7 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
/**
* For {@link org.apache.seatunnel.api.source.SourceReader} to register with
@@ -50,8 +51,7 @@ public class RegisterOperation extends Operation implements IdentifiedDataSerial
public void run() throws Exception {
SeaTunnelServer server = getService();
UUID readerUUID = getCallerUuid();
- TaskExecutionContext executionContext =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+ ConcurrentMap<Long, TaskExecutionContext> executionContextMap = server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
// TODO register reader to enumerator
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TaskExecutionServiceTest.java
deleted file mode 100644
index 014e62efd..000000000
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TaskExecutionServiceTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package execution;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import com.hazelcast.logging.ILogger;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-
-
-public class TaskExecutionServiceTest {
-
- HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext())).getOriginal();
- SeaTunnelServer service = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
- ILogger logger = instance.node.nodeEngine.getLogger(TaskExecutionServiceTest.class);
-
-
- @Test
- public void testAll() throws InterruptedException {
- testCancel();
- testFinish();
- }
-
- public void testCancel() throws InterruptedException {
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
-
- AtomicBoolean stop = new AtomicBoolean(false);
- TestTask testTask = new TestTask(stop, logger);
-
- TaskExecutionContext taskExecutionContext = taskExecutionService.submitTask(testTask);
-
- Thread.sleep(3000);
-
- taskExecutionContext.cancel();
-
- Thread.sleep(30000);
- assertTrue(taskExecutionContext.executionFuture.isCompletedExceptionally());
- }
-
- public void testFinish() throws InterruptedException {
- TaskExecutionService taskExecutionService = service.getTaskExecutionService();
-
- AtomicBoolean stop = new AtomicBoolean(false);
- AtomicBoolean futureMark = new AtomicBoolean(false);
- TestTask testTasklet = new TestTask(stop, logger);
-
- TaskExecutionContext taskExecutionContext = taskExecutionService.submitTask(testTasklet);
- taskExecutionContext.executionFuture.whenComplete(new BiConsumer<Void, Throwable>() {
- @Override
- public void accept(Void unused, Throwable throwable) {
- futureMark.set(true);
- }
- });
-
- Thread.sleep(3000);
-
- stop.set(true);
-
- Thread.sleep(1000);
-
- assertTrue(taskExecutionContext.executionFuture.isDone());
- assertTrue(futureMark.get());
- }
-}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
new file mode 100644
index 000000000..0bf524cb6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import static org.apache.seatunnel.engine.server.execution.ExecutionState.CANCELED;
+import static org.apache.seatunnel.engine.server.execution.ExecutionState.FAILED;
+import static org.apache.seatunnel.engine.server.execution.ExecutionState.FINISHED;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import com.hazelcast.logging.ILogger;
+import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
+import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
+import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
+import org.apache.seatunnel.engine.server.execution.TestTask;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+
+public class TaskExecutionServiceTest {
+
+ HazelcastInstanceImpl instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(new Config(), Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+ SeaTunnelServer service = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ ILogger logger = instance.node.nodeEngine.getLogger(TaskExecutionServiceTest.class);
+
+ long taskRunTime = 2000;
+
+ @Test
+ public void testAll() throws InterruptedException, ExecutionException {
+ logger.info("----------start Cancel test----------");
+ testCancel();
+
+ logger.info("----------start Finish test----------");
+ testFinish();
+
+ logger.info("----------start Delay test----------");
+ testDelay();
+ testDelay();
+
+ logger.info("----------start ThrowException test----------");
+ testThrowException();
+
+ logger.info("----------start CriticalCallTime test----------");
+ testCriticalCallTime();
+
+ }
+
+ public void testCancel() {
+ TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+ long sleepTime = 300;
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+ TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
+ TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
+
+ CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+
+ CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+
+ cancellationFuture.cancel(true);
+
+ await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(()->{
+ assertEquals(CANCELED, completableFuture.get().getExecutionState());
+ });
+ }
+
+ public void testFinish() {
+ TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+ long sleepTime = 300;
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+ AtomicBoolean futureMark = new AtomicBoolean(false);
+ TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
+ TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
+
+ CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+
+ CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+ completableFuture.whenComplete(new BiConsumer<TaskExecutionState, Throwable>() {
+ @Override
+ public void accept(TaskExecutionState unused, Throwable throwable) {
+ futureMark.set(true);
+ }
+ });
+
+ stop.set(true);
+
+ await().atMost(sleepTime + 100, TimeUnit.MILLISECONDS).untilAsserted(()->{
+ assertEquals(FINISHED, completableFuture.get().getExecutionState());
+ });
+
+ assertTrue(futureMark.get());
+ }
+
+
+ /**
+ * Test task execution time is the same as the timer timeout
+ */
+ public void testCriticalCallTime() throws InterruptedException {
+ AtomicBoolean stopMark = new AtomicBoolean(false);
+ CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
+
+ int count = 100;
+
+ //Must be the same as the timer timeout
+ int callTime = 50;
+
+ //Create tasks with critical delays
+ List<Task> criticalTask = buildStopTestTask(callTime, count, stopMark, stopTime);
+
+ TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+ CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(criticalTask)), new CompletableFuture<Void>());
+
+ // Run it for a while
+ Thread.sleep(taskRunTime);
+
+ //stop task
+ stopMark.set(true);
+
+ // Check all task ends right
+ await().atMost(count * callTime, TimeUnit.MILLISECONDS).untilAsserted(()->{
+ assertEquals(FINISHED, taskCts.get().getExecutionState());
+ });
+
+ //Check that each Task is only Done once
+ assertEquals(count, stopTime.size());
+
+ }
+
+ public void testThrowException() throws InterruptedException {
+ TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+ AtomicBoolean stopMark = new AtomicBoolean(false);
+
+ long t1Sleep = 100;
+ long t2Sleep = 50;
+
+ long lowLagSleep = 50;
+ long highLagSleep = 300;
+
+ List<Throwable> t1throwable = new ArrayList<>();
+ ExceptionTestTask t1 = new ExceptionTestTask(t1Sleep, "t1", t1throwable);
+
+ List<Throwable> t2throwable = new ArrayList<>();
+ ExceptionTestTask t2 = new ExceptionTestTask(t2Sleep, "t2", t2throwable);
+
+ //Create low lat tasks
+ List<Task> lowLagTask = buildFixedTestTask(lowLagSleep, 10, stopMark, new CopyOnWriteArrayList<>());
+
+ //Create high lat tasks
+ List<Task> highLagTask = buildFixedTestTask(highLagSleep, 5, stopMark, new CopyOnWriteArrayList<>());
+
+ List<Task> tasks = new ArrayList<>();
+ tasks.addAll(highLagTask);
+ tasks.addAll(lowLagTask);
+ Collections.shuffle(tasks);
+
+
+ CompletableFuture<TaskExecutionState> taskCts = taskExecutionService.submitTaskGroup(new TaskGroup("ts", Lists.newArrayList(tasks)), new CompletableFuture<Void>());
+
+
+ CompletableFuture<TaskExecutionState> t1c = taskExecutionService.submitTaskGroup(new TaskGroup("t1", Lists.newArrayList(t1)), new CompletableFuture<Void>());
+
+ CompletableFuture<TaskExecutionState> t2c = taskExecutionService.submitTaskGroup(new TaskGroup("t2", Lists.newArrayList(t2)), new CompletableFuture<Void>());
+
+ Thread.sleep(taskRunTime);
+
+ t1throwable.add(new IOException());
+ t2throwable.add(new IOException());
+
+ await().atMost(t1Sleep + t2Sleep, TimeUnit.MILLISECONDS).untilAsserted(()->{
+ assertEquals(FAILED, t1c.get().getExecutionState());
+ assertEquals(FAILED, t2c.get().getExecutionState());
+ });
+
+ stopMark.set(true);
+
+ await().atMost(lowLagSleep * 10 + highLagSleep, TimeUnit.MILLISECONDS).untilAsserted(()->{
+ assertEquals(FINISHED, taskCts.get().getExecutionState());
+ });
+ }
+
+ public void testDelay() throws InterruptedException {
+
+ long lowLagSleep = 10;
+ long highLagSleep = 300;
+
+ AtomicBoolean stopMark = new AtomicBoolean(false);
+
+ CopyOnWriteArrayList<Long> lowLagList = new CopyOnWriteArrayList<>();
+ CopyOnWriteArrayList<Long> highLagList = new CopyOnWriteArrayList<>();
+
+ //Create low lat tasks
+ List<Task> lowLagTask = buildFixedTestTask(lowLagSleep, 10, stopMark, lowLagList);
+
+ //Create high lat tasks
+ List<Task> highLagTask = buildFixedTestTask(highLagSleep, 5, stopMark, highLagList);
+
+ List<Task> tasks = new ArrayList<>();
+ tasks.addAll(highLagTask);
+ tasks.addAll(lowLagTask);
+ Collections.shuffle(tasks);
+
+ TaskGroup taskGroup = new TaskGroup("ts", Lists.newArrayList(tasks));
+
+
+ logger.info("task size is : " + taskGroup.getTasks().size());
+
+ TaskExecutionService taskExecutionService = service.getTaskExecutionService();
+
+ CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+
+ CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(taskGroup, cancellationFuture);
+
+ //stop tasks
+ Thread.sleep(taskRunTime);
+ stopMark.set(true);
+
+ //Check all task ends right
+ await().atMost(lowLagSleep * 10 + highLagSleep, TimeUnit.MILLISECONDS).untilAsserted(()->{
+ assertEquals(FINISHED, completableFuture.get().getExecutionState());
+ });
+
+ //Computation Delay
+ double lowAvg = lowLagList.stream().mapToLong(x -> x).average().getAsDouble();
+ double highAvg = highLagList.stream().mapToLong(x -> x).average().getAsDouble();
+
+ assertTrue(lowAvg < 400);
+
+ logger.info("lowAvg : " + lowAvg);
+ logger.info("highAvg : " + highAvg);
+
+ }
+
+ public List<Task> buildFixedTestTask(long callTime, long count, AtomicBoolean stopMart, CopyOnWriteArrayList<Long> lagList) {
+ List<Task> taskQueue = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ taskQueue.add(new FixedCallTestTimeTask(callTime, callTime + "t" + i, stopMart, lagList));
+ }
+ return taskQueue;
+ }
+
+ public List<Task> buildStopTestTask(long callTime, long count, AtomicBoolean stopMart, CopyOnWriteArrayList<Long> stopList) {
+ List<Task> taskQueue = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ taskQueue.add(new StopTimeTestTask(callTime, stopList, stopMart));
+ }
+ return taskQueue;
+ }
+
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/ExceptionTestTask.java
similarity index 59%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/ExceptionTestTask.java
index c0123277c..6815e29e2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/ExceptionTestTask.java
@@ -17,14 +17,33 @@
package org.apache.seatunnel.engine.server.execution;
-import lombok.Data;
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+import lombok.SneakyThrows;
-import java.io.Serializable;
-import java.util.Collection;
+import java.util.List;
-@Data
-public class TaskGroup implements Serializable {
- private final String taskGroupName;
+@AllArgsConstructor
+public class ExceptionTestTask implements Task {
+ long callTime;
+ String name;
+ List<Throwable> throwE;
- private final Collection<Task> tasks;
+ @SneakyThrows
+ @NonNull
+ @Override
+ public ProgressState call() {
+ if(!throwE.isEmpty()){
+ throw throwE.get(0);
+ }else {
+ Thread.sleep(callTime);
+ }
+ return ProgressState.MADE_PROGRESS;
+ }
+
+ @NonNull
+ @Override
+ public Long getTaskID() {
+ return (long) this.hashCode();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/FixedCallTestTimeTask.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/FixedCallTestTimeTask.java
index 21ae87bfe..d3022287b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/FixedCallTestTimeTask.java
@@ -15,49 +15,54 @@
* limitations under the License.
*/
-package execution;
+package org.apache.seatunnel.engine.server.execution;
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
-
-import com.hazelcast.logging.ILogger;
import lombok.NonNull;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-/**
- * For test use, only print logs
- */
-public class TestTask implements Task {
-
+public class FixedCallTestTimeTask implements Task {
+ long callTime;
+ String name;
+ long currentTime;
+ CopyOnWriteArrayList<Long> lagList;
AtomicBoolean stop;
- private final ILogger logger;
- public TestTask(AtomicBoolean stop, ILogger logger){
+ public FixedCallTestTimeTask(long callTime, String name, AtomicBoolean stop, CopyOnWriteArrayList<Long> lagList){
+ this.callTime = callTime;
+ this.name = name;
this.stop = stop;
- this.logger = logger;
+ this.lagList = lagList;
}
@NonNull
@Override
public ProgressState call() {
- ProgressState progressState;
- if (!stop.get()){
- logger.info("TestTasklet is running");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- progressState = ProgressState.MADE_PROGRESS;
- }else {
- progressState = ProgressState.DONE;
+ if(currentTime != 0){
+ lagList.add(System.currentTimeMillis() - currentTime);
+ }
+ currentTime = System.currentTimeMillis();
+
+ try {
+ Thread.sleep(callTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e.toString());
}
- return progressState;
+ if(stop.get()){
+ return ProgressState.DONE;
+ }
+ return ProgressState.MADE_PROGRESS;
}
@NonNull
@Override
public Long getTaskID() {
- return 1L;
+ return (long) this.hashCode();
+ }
+
+ @Override
+ public boolean isThreadsShare() {
+ return true;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/StopTimeTestTask.java
similarity index 56%
copy from seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/StopTimeTestTask.java
index 21ae87bfe..5bc1bb50c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/StopTimeTestTask.java
@@ -15,49 +15,43 @@
* limitations under the License.
*/
-package execution;
+package org.apache.seatunnel.engine.server.execution;
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
-
-import com.hazelcast.logging.ILogger;
+import lombok.AllArgsConstructor;
import lombok.NonNull;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-/**
- * For test use, only print logs
- */
-public class TestTask implements Task {
-
+@AllArgsConstructor
+public class StopTimeTestTask implements Task {
+ long callTime;
+ CopyOnWriteArrayList<Long> stopList;
AtomicBoolean stop;
- private final ILogger logger;
-
- public TestTask(AtomicBoolean stop, ILogger logger){
- this.stop = stop;
- this.logger = logger;
- }
@NonNull
@Override
public ProgressState call() {
- ProgressState progressState;
- if (!stop.get()){
- logger.info("TestTasklet is running");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- progressState = ProgressState.MADE_PROGRESS;
- }else {
- progressState = ProgressState.DONE;
+ try {
+ Thread.sleep(callTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e.toString());
}
- return progressState;
+ if(stop.get()){
+ stopList.add(Thread.currentThread().getId());
+ return ProgressState.DONE;
+ }
+ return ProgressState.MADE_PROGRESS;
}
@NonNull
@Override
public Long getTaskID() {
- return 1L;
+ return (long) this.hashCode();
+ }
+
+ @Override
+ public boolean isThreadsShare() {
+ return Task.super.isThreadsShare();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/TestTask.java
similarity index 76%
rename from seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/TestTask.java
index 21ae87bfe..8fbc6aa9a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/test/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/test/org/apache/seatunnel/engine/server/execution/TestTask.java
@@ -15,10 +15,7 @@
* limitations under the License.
*/
-package execution;
-
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
+package org.apache.seatunnel.engine.server.execution;
import com.hazelcast.logging.ILogger;
import lombok.NonNull;
@@ -31,11 +28,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class TestTask implements Task {
AtomicBoolean stop;
+ long sleep;
private final ILogger logger;
+ boolean isThreadsShare;
- public TestTask(AtomicBoolean stop, ILogger logger){
+ public TestTask(AtomicBoolean stop, ILogger logger, long sleep, boolean isThreadsShare){
this.stop = stop;
this.logger = logger;
+ this.sleep = sleep;
+ this.isThreadsShare = isThreadsShare;
}
@NonNull
@@ -43,9 +44,9 @@ public class TestTask implements Task {
public ProgressState call() {
ProgressState progressState;
if (!stop.get()){
- logger.info("TestTasklet is running");
+ logger.info("TestTask is running.........");
try {
- Thread.sleep(1000);
+ Thread.sleep(sleep);
} catch (InterruptedException e) {
}
progressState = ProgressState.MADE_PROGRESS;
@@ -58,6 +59,11 @@ public class TestTask implements Task {
@NonNull
@Override
public Long getTaskID() {
- return 1L;
+ return (long) this.hashCode();
+ }
+
+ @Override
+ public boolean isThreadsShare() {
+ return isThreadsShare;
}
}