You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/30 07:57:44 UTC

[seatunnel] branch dev updated: [Bugfix][DAG] Fix the incorrect setting of transform parallelism (#4814)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 54a5e2b52 [Bugfix][DAG] Fix the incorrect setting of transform parallelism (#4814)
54a5e2b52 is described below

commit 54a5e2b5264b7bc6ad8c23ba84a019611f4dd8f8
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue May 30 15:57:38 2023 +0800

    [Bugfix][DAG] Fix the incorrect setting of transform parallelism (#4814)
---
 .../seatunnel/engine/core/parse/MultipleTableJobConfigParser.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 8fd9d90bd..5fd4892cd 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -409,9 +409,9 @@ public class MultipleTableJobConfigParser {
         SeaTunnelDataType<?> expectedType = getProducedType(inputs.get(0)._2());
         checkProducedTypeEquals(inputActions);
         int spareParallelism = inputs.get(0)._2().getParallelism();
+        int parallelism =
+                readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
         if (fallback) {
-            int parallelism =
-                    readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
             Tuple2<CatalogTable, Action> tuple =
                     fallbackParser.parseTransform(
                             config,
@@ -437,6 +437,7 @@ public class MultipleTableJobConfigParser {
         TransformAction transformAction =
                 new TransformAction(
                         id, actionName, new ArrayList<>(inputActions), transform, factoryUrls);
+        transformAction.setParallelism(parallelism);
         tableWithActionMap.put(
                 tableId,
                 Collections.singletonList(