You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/15 07:47:07 UTC

[incubator-seatunnel] branch st-engine updated: [hotfix][engine] fix serialization exception (#2734)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new dc6c09d15 [hotfix][engine] fix serialization exception (#2734)
dc6c09d15 is described below

commit dc6c09d159a8baab7d2edb43f5064e7aaaf1c917
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Sep 15 15:47:01 2022 +0800

    [hotfix][engine] fix serialization exception (#2734)
---
 .../apache/seatunnel/engine/core/dag/actions/AbstractAction.java | 2 +-
 .../engine/server/dag/execution/ExecutionPlanGenerator.java      | 2 --
 .../org/apache/seatunnel/engine/server/task/SeaTunnelTask.java   | 3 +--
 .../server/task/operation/sink/SinkPrepareCommitOperation.java   | 9 +++++----
 4 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 28c0f984b..f38046e62 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -26,7 +26,7 @@ import java.util.Set;
 
 public abstract class AbstractAction implements Action {
     private String name;
-    private List<Action> upstreams = new ArrayList<>();
+    private transient List<Action> upstreams = new ArrayList<>();
     // This is used to assign a unique ID to every Action
     private long id;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index b32f53b08..25aa11ef5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -187,13 +187,11 @@ public class ExecutionPlanGenerator {
         if (action instanceof PartitionTransformAction) {
             newAction = new PartitionTransformAction(id,
                 action.getName(),
-                action.getUpstream(),
                 ((PartitionTransformAction) action).getPartitionTransformation(),
                 action.getJarUrls());
         } else if (action instanceof SinkAction) {
             newAction = new SinkAction<>(id,
                 action.getName(),
-                action.getUpstream(),
                 ((SinkAction<?, ?, ?, ?>) action).getSink(),
                 action.getJarUrls());
         } else if (action instanceof SourceAction){
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 6ffdcd66a..f7b215820 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -233,9 +233,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
     public Set<URL> getJarsUrl() {
         List<Flow> now = Collections.singletonList(executionFlow);
         Set<URL> urls = new HashSet<>();
-        List<Flow> next = new ArrayList<>();
         while (!now.isEmpty()) {
-            next.clear();
+            final List<Flow> next = new ArrayList<>();
             now.forEach(n -> {
                 if (n instanceof PhysicalExecutionFlow) {
                     urls.addAll(((PhysicalExecutionFlow) n).getAction().getJarUrls());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index 6d970ed53..f206071f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.operation.sink;
 
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
 import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -69,10 +70,10 @@ public class SinkPrepareCommitOperation extends CheckpointBarrierTriggerOperatio
 
     @Override
     public void run() throws Exception {
-        SeaTunnelServer server = getService();
-        SinkAggregatedCommitterTask<?, ?> committerTask = server.getTaskExecutionService().getTask(taskLocation);
-        // TODO add classloader support with #2704
-        committerTask.receivedWriterCommitInfo(barrier.getId(), SerializationUtils.deserialize(commitInfos));
+        TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService();
+        SinkAggregatedCommitterTask<?, ?> committerTask = taskExecutionService.getTask(taskLocation);
+        ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
+        committerTask.receivedWriterCommitInfo(barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
         committerTask.triggerBarrier(barrier);
     }
 }