You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/15 07:47:07 UTC
[incubator-seatunnel] branch st-engine updated: [hotfix][engine] fix serialization exception (#2734)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new dc6c09d15 [hotfix][engine] fix serialization exception (#2734)
dc6c09d15 is described below
commit dc6c09d159a8baab7d2edb43f5064e7aaaf1c917
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Sep 15 15:47:01 2022 +0800
[hotfix][engine] fix serialization exception (#2734)
---
.../apache/seatunnel/engine/core/dag/actions/AbstractAction.java | 2 +-
.../engine/server/dag/execution/ExecutionPlanGenerator.java | 2 --
.../org/apache/seatunnel/engine/server/task/SeaTunnelTask.java | 3 +--
.../server/task/operation/sink/SinkPrepareCommitOperation.java | 9 +++++----
4 files changed, 7 insertions(+), 9 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 28c0f984b..f38046e62 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -26,7 +26,7 @@ import java.util.Set;
public abstract class AbstractAction implements Action {
private String name;
- private List<Action> upstreams = new ArrayList<>();
+ private transient List<Action> upstreams = new ArrayList<>();
// This is used to assign a unique ID to every Action
private long id;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index b32f53b08..25aa11ef5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -187,13 +187,11 @@ public class ExecutionPlanGenerator {
if (action instanceof PartitionTransformAction) {
newAction = new PartitionTransformAction(id,
action.getName(),
- action.getUpstream(),
((PartitionTransformAction) action).getPartitionTransformation(),
action.getJarUrls());
} else if (action instanceof SinkAction) {
newAction = new SinkAction<>(id,
action.getName(),
- action.getUpstream(),
((SinkAction<?, ?, ?, ?>) action).getSink(),
action.getJarUrls());
} else if (action instanceof SourceAction){
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 6ffdcd66a..f7b215820 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -233,9 +233,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
public Set<URL> getJarsUrl() {
List<Flow> now = Collections.singletonList(executionFlow);
Set<URL> urls = new HashSet<>();
- List<Flow> next = new ArrayList<>();
while (!now.isEmpty()) {
- next.clear();
+ final List<Flow> next = new ArrayList<>();
now.forEach(n -> {
if (n instanceof PhysicalExecutionFlow) {
urls.addAll(((PhysicalExecutionFlow) n).getAction().getJarUrls());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index 6d970ed53..f206071f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.operation.sink;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -69,10 +70,10 @@ public class SinkPrepareCommitOperation extends CheckpointBarrierTriggerOperatio
@Override
public void run() throws Exception {
- SeaTunnelServer server = getService();
- SinkAggregatedCommitterTask<?, ?> committerTask = server.getTaskExecutionService().getTask(taskLocation);
- // TODO add classloader support with #2704
- committerTask.receivedWriterCommitInfo(barrier.getId(), SerializationUtils.deserialize(commitInfos));
+ TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService();
+ SinkAggregatedCommitterTask<?, ?> committerTask = taskExecutionService.getTask(taskLocation);
+ ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
+ committerTask.receivedWriterCommitInfo(barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
committerTask.triggerBarrier(barrier);
}
}