You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/22 11:59:30 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] Fix task running error. (#2500)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 730215258 [Engine][Task] Fix task running error. (#2500)
730215258 is described below
commit 730215258ba1fec6249acde37e32bb05c9f6492a
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Aug 22 19:59:24 2022 +0800
[Engine][Task] Fix task running error. (#2500)
---
.../engine/client/SeaTunnelClientTest.java | 2 --
.../server/operation/DeployTaskOperation.java | 6 ++--
.../serializable/TaskDataSerializerHook.java | 9 ++++--
.../engine/server/task/SeaTunnelTask.java | 8 ++++-
.../server/task/SourceSplitEnumeratorTask.java | 30 ++++++++++++-------
.../server/task/flow/SourceFlowLifeCycle.java | 5 ++--
.../server/task/flow/TransformFlowLifeCycle.java | 6 ++++
.../task/group/TaskGroupWithIntermediateQueue.java | 8 ++---
...erOperation.java => CloseRequestOperation.java} | 35 +++++++++++-----------
...tion.java => SourceNoMoreElementOperation.java} | 6 ++--
10 files changed, 68 insertions(+), 47 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 6a1c992a5..6ed009de7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -84,8 +84,6 @@ public class SeaTunnelClientTest {
jobProxy.waitForJobComplete();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
- // TODO throw exception after fix sink.setTypeInfo in ConnectorInstanceLoader
- // throw new RuntimeException(e);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
index 8a36d9ea4..4c48060b4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
import com.hazelcast.internal.nio.IOUtil;
@@ -41,8 +41,8 @@ public class DeployTaskOperation extends AsyncOperation {
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
- TaskExecutionService taskExecutionService = getService();
- return taskExecutionService.deployTask(taskImmutableInformation);
+ SeaTunnelServer server = getService();
+ return server.getTaskExecutionService().deployTask(taskImmutableInformation);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index b7b0995c4..059325a36 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -24,9 +24,10 @@ import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.source.CloseRequestOperation;
import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
-import org.apache.seatunnel.engine.server.task.operation.source.SourceUnregisterOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -53,6 +54,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
public static final int PROGRESS_TYPE = 9;
+ public static final int CLOSE_REQUEST_TYPE = 10;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -82,7 +85,7 @@ public class TaskDataSerializerHook implements DataSerializerHook {
case TASK_GROUP_INFO_TYPE:
return new TaskGroupImmutableInformation();
case SOURCE_UNREGISTER_TYPE:
- return new SourceUnregisterOperation();
+ return new SourceNoMoreElementOperation();
case SINK_REGISTER_TYPE:
return new SinkRegisterOperation();
case SINK_UNREGISTER_TYPE:
@@ -91,6 +94,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
return new TaskLocation();
case PROGRESS_TYPE:
return new Progress();
+ case CLOSE_REQUEST_TYPE:
+ return new CloseRequestOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 39d0f6294..b5098976c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -61,6 +61,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
protected FlowLifeCycle startFlowLifeCycle;
+ private List<FlowLifeCycle> allCycles;
+
protected List<OneInputFlowLifeCycle<Record<?>>> outputs;
protected List<CompletableFuture<Void>> flowFutures;
@@ -79,7 +81,11 @@ public abstract class SeaTunnelTask extends AbstractTask {
public void init() throws Exception {
super.init();
flowFutures = new ArrayList<>();
+ allCycles = new ArrayList<>();
startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
+ for (FlowLifeCycle cycle : allCycles) {
+ cycle.init();
+ }
}
public void setTaskGroup(TaskGroup group) {
@@ -132,7 +138,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
} else {
throw new UnknownFlowException(flow);
}
- lifeCycle.init();
+ allCycles.add(lifeCycle);
return lifeCycle;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 9f7d97f80..c673271d0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -23,21 +23,25 @@ import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
+import org.apache.seatunnel.engine.server.task.operation.source.CloseRequestOperation;
import org.apache.seatunnel.engine.server.task.statemachine.EnumeratorState;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends CoordinatorTask {
@@ -49,7 +53,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private final SourceAction<?, SplitT, ?> source;
private SourceSplitEnumerator<?, ?> enumerator;
private int maxReaderSize;
- private AtomicInteger unfinishedReaders;
+ private Set<Long> unfinishedReaders;
private Map<TaskLocation, Address> taskMemberMapping;
private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
@@ -70,7 +74,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
taskMemberMapping = new ConcurrentHashMap<>();
taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
maxReaderSize = source.getParallelism();
- unfinishedReaders = new AtomicInteger(maxReaderSize);
+ unfinishedReaders = new CopyOnWriteArraySet<>();
enumerator.open();
}
@@ -122,8 +126,8 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
}
public void readerFinished(long taskID) {
- removeTaskMemberMapping(taskID);
- if (unfinishedReaders.decrementAndGet() == 0) {
+ unfinishedReaders.remove(taskID);
+ if (unfinishedReaders.isEmpty()) {
readerFinishFuture.complete(null);
}
}
@@ -147,6 +151,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
currState = EnumeratorState.READER_CLOSED;
break;
case READER_CLOSED:
+ closeAllReader();
currState = EnumeratorState.CLOSED;
break;
case CLOSED:
@@ -167,16 +172,19 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
readerRegisterFuture.join();
}
- public void removeTaskMemberMapping(long taskID) {
- TaskLocation taskLocation = taskIDToTaskLocationMapping.get(taskID);
- taskMemberMapping.remove(taskLocation);
- taskIDToTaskLocationMapping.remove(taskID);
- }
-
public Set<Long> getRegisteredReaders() {
return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskID).collect(Collectors.toSet());
}
+ private void closeAllReader() {
+ List<InvocationFuture<?>> futures = new ArrayList<>();
+ taskMemberMapping.forEach((location, address) -> {
+ futures.add(this.getExecutionContext().sendToMember(new CloseRequestOperation(location),
+ address));
+ });
+ futures.forEach(InvocationFuture::join);
+ }
+
@Override
public Set<URL> getJarsUrl() {
return new HashSet<>(source.getJarUrls());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index c5f4fb9cc..3b499d63f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -25,8 +25,8 @@ import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SourceReaderContext;
import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
-import org.apache.seatunnel.engine.server.task.operation.source.SourceUnregisterOperation;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -95,9 +95,8 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
try {
this.closed = true;
collector.close();
- runningTask.getExecutionContext().sendToMaster(new SourceUnregisterOperation(currentTaskID,
+ runningTask.getExecutionContext().sendToMaster(new SourceNoMoreElementOperation(currentTaskID,
enumeratorTaskID)).get();
- this.close();
} catch (Exception e) {
LOGGER.warning("source close failed ", e);
throw new RuntimeException(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index ebd90a0f8..c1698450a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.server.task.TaskRuntimeException;
import org.apache.seatunnel.engine.server.task.record.ClosedSign;
import java.util.List;
@@ -42,6 +43,11 @@ public class TransformFlowLifeCycle<T> extends AbstractFlowLifeCycle implements
public void received(Record<?> row) {
if (row.getData() instanceof ClosedSign) {
collector.collect(row);
+ try {
+ this.close();
+ } catch (Exception e) {
+ throw new TaskRuntimeException(e);
+ }
} else {
T r = (T) row.getData();
for (SeaTunnelTransform<T> t : transform) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
index 6ec3c86b1..08d5de1c1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
@@ -23,10 +23,10 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
@@ -40,15 +40,13 @@ public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
@Override
public void init() {
- blockingQueueCache = new HashMap<>();
+ blockingQueueCache = new ConcurrentHashMap<>();
getTasks().stream().filter(SeaTunnelTask.class::isInstance)
.map(s -> (SeaTunnelTask) s).forEach(s -> s.setTaskGroup(this));
}
public BlockingQueue<Record<?>> getBlockingQueueCache(long id) {
- if (!blockingQueueCache.containsKey(id)) {
- blockingQueueCache.put(id, new ArrayBlockingQueue<>(QUEUE_SIZE));
- }
+ blockingQueueCache.computeIfAbsent(id, i -> new ArrayBlockingQueue<>(QUEUE_SIZE));
return blockingQueueCache.get(id);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
similarity index 67%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index 935d81a9d..ee3783352 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
+import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
@@ -29,40 +29,41 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class SourceUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class CloseRequestOperation extends Operation implements IdentifiedDataSerializable {
- private TaskLocation currentTaskID;
- private TaskLocation enumeratorTaskID;
+ private TaskLocation readerLocation;
- public SourceUnregisterOperation() {
+ public CloseRequestOperation() {
}
- public SourceUnregisterOperation(TaskLocation currentTaskID, TaskLocation enumeratorTaskID) {
- this.currentTaskID = currentTaskID;
- this.enumeratorTaskID = enumeratorTaskID;
+ public CloseRequestOperation(TaskLocation readerLocation) {
+ this.readerLocation = readerLocation;
}
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SourceSplitEnumeratorTask<?> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
- .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
- task.readerFinished(currentTaskID.getTaskID());
+ SourceSeaTunnelTask<?, ?> task =
+ server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
+ .getTaskGroup().getTask(readerLocation.getTaskID());
+ task.close();
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- currentTaskID.writeData(out);
- enumeratorTaskID.writeData(out);
+ readerLocation.writeData(out);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- currentTaskID.readData(in);
- enumeratorTaskID.readData(in);
+ readerLocation.readData(in);
}
@Override
@@ -72,6 +73,6 @@ public class SourceUnregisterOperation extends Operation implements IdentifiedDa
@Override
public int getClassId() {
- return TaskDataSerializerHook.SOURCE_UNREGISTER_TYPE;
+ return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
similarity index 91%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 935d81a9d..b8cabcc16 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceUnregisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -29,15 +29,15 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class SourceUnregisterOperation extends Operation implements IdentifiedDataSerializable {
+public class SourceNoMoreElementOperation extends Operation implements IdentifiedDataSerializable {
private TaskLocation currentTaskID;
private TaskLocation enumeratorTaskID;
- public SourceUnregisterOperation() {
+ public SourceNoMoreElementOperation() {
}
- public SourceUnregisterOperation(TaskLocation currentTaskID, TaskLocation enumeratorTaskID) {
+ public SourceNoMoreElementOperation(TaskLocation currentTaskID, TaskLocation enumeratorTaskID) {
this.currentTaskID = currentTaskID;
this.enumeratorTaskID = enumeratorTaskID;
}