You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/04/07 08:48:56 UTC

(seatunnel) branch dev updated: [Improve][Transform] Remove Fallback during parsing Transform process (#6644)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 50c6c94d27 [Improve][Transform] Remove Fallback during parsing Transform process (#6644)
50c6c94d27 is described below

commit 50c6c94d27ef4fb3e99c71a7ef451af9aa2e4258
Author: xiaochen <59...@qq.com>
AuthorDate: Sun Apr 7 16:48:51 2024 +0800

    [Improve][Transform] Remove Fallback during parsing Transform process (#6644)
---
 .../api/transform/SeaTunnelTransform.java          | 20 +++---------
 .../flink/execution/TransformExecuteProcessor.java | 28 ++++++++---------
 .../engine/core/parse/JobConfigParser.java         | 35 ---------------------
 .../core/parse/MultipleTableJobConfigParser.java   | 36 +++-------------------
 .../engine/server/master/JobMetricsTest.java       | 10 +++---
 .../common/AbstractCatalogSupportTransform.java    | 10 ------
 .../common/AbstractSeaTunnelTransform.java         |  6 ----
 7 files changed, 27 insertions(+), 118 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index a7ccd081ce..a64e1b7c7d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.api.transform;
 
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
-import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.source.SeaTunnelJobAware;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -26,10 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import java.io.Serializable;
 
 public interface SeaTunnelTransform<T>
-        extends Serializable,
-                PluginIdentifierInterface,
-                SeaTunnelPluginLifeCycle,
-                SeaTunnelJobAware {
+        extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {
 
     /** call it when Transformer initialed */
     default void open() {}
@@ -45,22 +41,14 @@ public interface SeaTunnelTransform<T>
         throw new UnsupportedOperationException("setTypeInfo method is not supported");
     }
 
-    /**
-     * Get the data type of the records produced by this transform.
-     *
-     * @deprecated Please use {@link #getProducedCatalogTable}
-     * @return Produced data type.
-     */
-    @Deprecated
-    SeaTunnelDataType<T> getProducedType();
-
     /** Get the catalog table output by this transform */
     CatalogTable getProducedCatalogTable();
 
     /**
-     * Transform input data to {@link this#getProducedType()} types data.
+     * Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types
+     * data.
      *
-     * @param row the data need be transform.
+     * @param row the data need be transformed.
      * @return transformed data.
      */
     T map(T row);
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 450599ff7b..d91bb9d3da 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
 
 import java.net.URL;
 import java.util.Collections;
@@ -119,24 +118,25 @@ public class TransformExecuteProcessor
 
     protected DataStream<Row> flinkTransform(
             SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
-        TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
+        TypeInformation rowTypeInfo =
+                TypeConverterUtils.convert(
+                        transform.getProducedCatalogTable().getSeaTunnelRowType());
         FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
         FlinkRowConverter transformOutputRowConverter =
                 new FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
         DataStream<Row> output =
                 stream.flatMap(
-                        new FlatMapFunction<Row, Row>() {
-                            @Override
-                            public void flatMap(Row value, Collector<Row> out) throws Exception {
-                                SeaTunnelRow seaTunnelRow =
-                                        transformInputRowConverter.reconvert(value);
-                                SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
-                                if (dataRow != null) {
-                                    Row copy = transformOutputRowConverter.convert(dataRow);
-                                    out.collect(copy);
-                                }
-                            }
-                        },
+                        (FlatMapFunction<Row, Row>)
+                                (value, out) -> {
+                                    SeaTunnelRow seaTunnelRow =
+                                            transformInputRowConverter.reconvert(value);
+                                    SeaTunnelRow dataRow =
+                                            (SeaTunnelRow) transform.map(seaTunnelRow);
+                                    if (dataRow != null) {
+                                        Row copy = transformOutputRowConverter.convert(dataRow);
+                                        out.collect(copy);
+                                    }
+                                },
                         rowTypeInfo);
         return output;
     }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 2ef1a28aff..981b85049a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -23,10 +23,8 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -34,7 +32,6 @@ import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
-import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -99,38 +96,6 @@ public class JobConfigParser {
         return new Tuple2<>(catalogTable, action);
     }
 
-    public Tuple2<CatalogTable, Action> parseTransform(
-            Config config,
-            JobConfig jobConfig,
-            String tableId,
-            int parallelism,
-            SeaTunnelRowType rowType,
-            Set<Action> inputActions) {
-        final ImmutablePair<SeaTunnelTransform<?>, Set<URL>> tuple =
-                ConnectorInstanceLoader.loadTransformInstance(
-                        config, jobConfig.getJobContext(), commonPluginJars);
-        final SeaTunnelTransform<?> transform = tuple.getLeft();
-        // old logic: prepare(initialization) -> set job context -> set row type (There is a logical
-        // judgment that depends on before and after, not a simple set)
-        transform.prepare(config);
-        transform.setJobContext(jobConfig.getJobContext());
-        transform.setTypeInfo((SeaTunnelDataType) rowType);
-        final String actionName = createTransformActionName(0, tuple.getLeft().getPluginName());
-        final TransformAction action =
-                new TransformAction(
-                        idGenerator.getNextId(),
-                        actionName,
-                        new ArrayList<>(inputActions),
-                        transform,
-                        tuple.getRight(),
-                        new HashSet<>());
-        action.setParallelism(parallelism);
-        CatalogTable catalogTable =
-                CatalogTableUtil.getCatalogTable(
-                        tableId, (SeaTunnelRowType) transform.getProducedType());
-        return new Tuple2<>(catalogTable, action);
-    }
-
     public List<SinkAction<?, ?, ?, ?>> parseSinks(
             int configIndex,
             List<List<Tuple2<CatalogTable, Action>>> inputVertices,
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index f988f293a5..883c7c59fa 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -34,9 +34,7 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -405,35 +403,14 @@ public class MultipleTableJobConfigParser {
         final String tableId =
                 readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
 
-        boolean fallback =
-                isFallback(
-                        classLoader,
-                        TableTransformFactory.class,
-                        factoryId,
-                        (factory) -> factory.createTransform(null));
-
         Set<Action> inputActions =
                 inputs.stream()
                         .map(Tuple2::_2)
                         .collect(Collectors.toCollection(LinkedHashSet::new));
-        SeaTunnelDataType<?> expectedType = getProducedType(inputs.get(0)._2());
         checkProducedTypeEquals(inputActions);
         int spareParallelism = inputs.get(0)._2().getParallelism();
         int parallelism =
                 readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
-        if (fallback) {
-            Tuple2<CatalogTable, Action> tuple =
-                    fallbackParser.parseTransform(
-                            config,
-                            jobConfig,
-                            tableId,
-                            parallelism,
-                            (SeaTunnelRowType) expectedType,
-                            inputActions);
-            tableWithActionMap.put(tableId, Collections.singletonList(tuple));
-            return;
-        }
-
         CatalogTable catalogTable = inputs.get(0)._1();
         SeaTunnelTransform<?> transform =
                 FactoryUtil.createAndPrepareTransform(
@@ -470,15 +447,10 @@ public class MultipleTableJobConfigParser {
                 return ((SourceAction<?, ?, ?>) action).getSource().getProducedType();
             }
         } else if (action instanceof TransformAction) {
-            try {
-                return ((TransformAction) action)
-                        .getTransform()
-                        .getProducedCatalogTable()
-                        .getSeaTunnelRowType();
-            } catch (UnsupportedOperationException e) {
-                // TODO remove it when all connector use `getProducedCatalogTables`
-                return ((TransformAction) action).getTransform().getProducedType();
-            }
+            return ((TransformAction) action)
+                    .getTransform()
+                    .getProducedCatalogTable()
+                    .getSeaTunnelRowType();
         }
         throw new UnsupportedOperationException();
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 3b81f2a655..ed12a565d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -137,7 +137,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                                         server.getCoordinatorService().getJobStatus(jobId3)));
 
         // check metrics
-        await().atMost(60000, TimeUnit.MILLISECONDS)
+        await().atMost(600000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () -> {
                             JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3);
@@ -161,12 +161,12 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
         server.getCoordinatorService().cancelJob(jobId3);
     }
 
-    private void startJob(Long jobid, String path, boolean isStartWithSavePoint) {
-        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
+    private void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
+        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);
 
         JobImmutableInformation jobImmutableInformation =
                 new JobImmutableInformation(
-                        jobid,
+                        jobId,
                         "Test",
                         isStartWithSavePoint,
                         nodeEngine.getSerializationService().toData(testLogicalDag),
@@ -177,7 +177,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
         Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
 
         PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-                server.getCoordinatorService().submitJob(jobid, data);
+                server.getCoordinatorService().submitJob(jobId, data);
         voidPassiveCompletableFuture.join();
     }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index 78fe02094f..5670bcc129 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.transform.common;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import lombok.NonNull;
 
@@ -61,12 +59,4 @@ public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelT
     protected abstract TableSchema transformTableSchema();
 
     protected abstract TableIdentifier transformTableIdentifier();
-
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        if (outputRowType != null) {
-            return outputRowType;
-        }
-        return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
-    }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index b710034cad..1892881c27 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -30,11 +29,6 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<S
 
     protected SeaTunnelRowType outputRowType;
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return outputRowType;
-    }
-
     @Override
     public SeaTunnelRow map(SeaTunnelRow row) {
         return transformRow(row);