You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/04/07 08:48:56 UTC
(seatunnel) branch dev updated: [Improve][Transform] Remove Fallback during parsing Transform process (#6644)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 50c6c94d27 [Improve][Transform] Remove Fallback during parsing Transform process (#6644)
50c6c94d27 is described below
commit 50c6c94d27ef4fb3e99c71a7ef451af9aa2e4258
Author: xiaochen <59...@qq.com>
AuthorDate: Sun Apr 7 16:48:51 2024 +0800
[Improve][Transform] Remove Fallback during parsing Transform process (#6644)
---
.../api/transform/SeaTunnelTransform.java | 20 +++---------
.../flink/execution/TransformExecuteProcessor.java | 28 ++++++++---------
.../engine/core/parse/JobConfigParser.java | 35 ---------------------
.../core/parse/MultipleTableJobConfigParser.java | 36 +++-------------------
.../engine/server/master/JobMetricsTest.java | 10 +++---
.../common/AbstractCatalogSupportTransform.java | 10 ------
.../common/AbstractSeaTunnelTransform.java | 6 ----
7 files changed, 27 insertions(+), 118 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index a7ccd081ce..a64e1b7c7d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.api.transform;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
-import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -26,10 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import java.io.Serializable;
public interface SeaTunnelTransform<T>
- extends Serializable,
- PluginIdentifierInterface,
- SeaTunnelPluginLifeCycle,
- SeaTunnelJobAware {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {
/** call it when Transformer initialed */
default void open() {}
@@ -45,22 +41,14 @@ public interface SeaTunnelTransform<T>
throw new UnsupportedOperationException("setTypeInfo method is not supported");
}
- /**
- * Get the data type of the records produced by this transform.
- *
- * @deprecated Please use {@link #getProducedCatalogTable}
- * @return Produced data type.
- */
- @Deprecated
- SeaTunnelDataType<T> getProducedType();
-
/** Get the catalog table output by this transform */
CatalogTable getProducedCatalogTable();
/**
- * Transform input data to {@link this#getProducedType()} types data.
+ * Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types
+ * data.
*
- * @param row the data need be transform.
+ * @param row the data need be transformed.
* @return transformed data.
*/
T map(T row);
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 450599ff7b..d91bb9d3da 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
import java.net.URL;
import java.util.Collections;
@@ -119,24 +118,25 @@ public class TransformExecuteProcessor
protected DataStream<Row> flinkTransform(
SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
- TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
+ TypeInformation rowTypeInfo =
+ TypeConverterUtils.convert(
+ transform.getProducedCatalogTable().getSeaTunnelRowType());
FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
FlinkRowConverter transformOutputRowConverter =
new FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
DataStream<Row> output =
stream.flatMap(
- new FlatMapFunction<Row, Row>() {
- @Override
- public void flatMap(Row value, Collector<Row> out) throws Exception {
- SeaTunnelRow seaTunnelRow =
- transformInputRowConverter.reconvert(value);
- SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
- if (dataRow != null) {
- Row copy = transformOutputRowConverter.convert(dataRow);
- out.collect(copy);
- }
- }
- },
+ (FlatMapFunction<Row, Row>)
+ (value, out) -> {
+ SeaTunnelRow seaTunnelRow =
+ transformInputRowConverter.reconvert(value);
+ SeaTunnelRow dataRow =
+ (SeaTunnelRow) transform.map(seaTunnelRow);
+ if (dataRow != null) {
+ Row copy = transformOutputRowConverter.convert(dataRow);
+ out.collect(copy);
+ }
+ },
rowTypeInfo);
return output;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 2ef1a28aff..981b85049a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -23,10 +23,8 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -34,7 +32,6 @@ import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
-import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -99,38 +96,6 @@ public class JobConfigParser {
return new Tuple2<>(catalogTable, action);
}
- public Tuple2<CatalogTable, Action> parseTransform(
- Config config,
- JobConfig jobConfig,
- String tableId,
- int parallelism,
- SeaTunnelRowType rowType,
- Set<Action> inputActions) {
- final ImmutablePair<SeaTunnelTransform<?>, Set<URL>> tuple =
- ConnectorInstanceLoader.loadTransformInstance(
- config, jobConfig.getJobContext(), commonPluginJars);
- final SeaTunnelTransform<?> transform = tuple.getLeft();
- // old logic: prepare(initialization) -> set job context -> set row type (There is a logical
- // judgment that depends on before and after, not a simple set)
- transform.prepare(config);
- transform.setJobContext(jobConfig.getJobContext());
- transform.setTypeInfo((SeaTunnelDataType) rowType);
- final String actionName = createTransformActionName(0, tuple.getLeft().getPluginName());
- final TransformAction action =
- new TransformAction(
- idGenerator.getNextId(),
- actionName,
- new ArrayList<>(inputActions),
- transform,
- tuple.getRight(),
- new HashSet<>());
- action.setParallelism(parallelism);
- CatalogTable catalogTable =
- CatalogTableUtil.getCatalogTable(
- tableId, (SeaTunnelRowType) transform.getProducedType());
- return new Tuple2<>(catalogTable, action);
- }
-
public List<SinkAction<?, ?, ?, ?>> parseSinks(
int configIndex,
List<List<Tuple2<CatalogTable, Action>>> inputVertices,
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index f988f293a5..883c7c59fa 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -34,9 +34,7 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -405,35 +403,14 @@ public class MultipleTableJobConfigParser {
final String tableId =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
- boolean fallback =
- isFallback(
- classLoader,
- TableTransformFactory.class,
- factoryId,
- (factory) -> factory.createTransform(null));
-
Set<Action> inputActions =
inputs.stream()
.map(Tuple2::_2)
.collect(Collectors.toCollection(LinkedHashSet::new));
- SeaTunnelDataType<?> expectedType = getProducedType(inputs.get(0)._2());
checkProducedTypeEquals(inputActions);
int spareParallelism = inputs.get(0)._2().getParallelism();
int parallelism =
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
- if (fallback) {
- Tuple2<CatalogTable, Action> tuple =
- fallbackParser.parseTransform(
- config,
- jobConfig,
- tableId,
- parallelism,
- (SeaTunnelRowType) expectedType,
- inputActions);
- tableWithActionMap.put(tableId, Collections.singletonList(tuple));
- return;
- }
-
CatalogTable catalogTable = inputs.get(0)._1();
SeaTunnelTransform<?> transform =
FactoryUtil.createAndPrepareTransform(
@@ -470,15 +447,10 @@ public class MultipleTableJobConfigParser {
return ((SourceAction<?, ?, ?>) action).getSource().getProducedType();
}
} else if (action instanceof TransformAction) {
- try {
- return ((TransformAction) action)
- .getTransform()
- .getProducedCatalogTable()
- .getSeaTunnelRowType();
- } catch (UnsupportedOperationException e) {
- // TODO remove it when all connector use `getProducedCatalogTables`
- return ((TransformAction) action).getTransform().getProducedType();
- }
+ return ((TransformAction) action)
+ .getTransform()
+ .getProducedCatalogTable()
+ .getSeaTunnelRowType();
}
throw new UnsupportedOperationException();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 3b81f2a655..ed12a565d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -137,7 +137,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
server.getCoordinatorService().getJobStatus(jobId3)));
// check metrics
- await().atMost(60000, TimeUnit.MILLISECONDS)
+ await().atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3);
@@ -161,12 +161,12 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
server.getCoordinatorService().cancelJob(jobId3);
}
- private void startJob(Long jobid, String path, boolean isStartWithSavePoint) {
- LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
+ private void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
+ LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
- jobid,
+ jobId,
"Test",
isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
@@ -177,7 +177,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- server.getCoordinatorService().submitJob(jobid, data);
+ server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index 78fe02094f..5670bcc129 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import lombok.NonNull;
@@ -61,12 +59,4 @@ public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelT
protected abstract TableSchema transformTableSchema();
protected abstract TableIdentifier transformTableIdentifier();
-
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- if (outputRowType != null) {
- return outputRowType;
- }
- return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
- }
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index b710034cad..1892881c27 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -30,11 +29,6 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<S
protected SeaTunnelRowType outputRowType;
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return outputRowType;
- }
-
@Override
public SeaTunnelRow map(SeaTunnelRow row) {
return transformRow(row);