You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/09/25 04:14:04 UTC
[seatunnel] branch dev updated: [Feature][Flink] Support Decimal Type with configurable precision and scale (#5419)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7348be2a7a [Feature][Flink] Support Decimal Type with configurable precision and scale (#5419)
7348be2a7a is described below
commit 7348be2a7a530704cfaffd77e4bf552639618f31
Author: Chengyu Yan <ch...@hotmail.com>
AuthorDate: Mon Sep 25 12:13:58 2023 +0800
[Feature][Flink] Support Decimal Type with configurable precision and scale (#5419)
---
release-note.md | 1 +
.../flink/execution/FlinkRuntimeEnvironment.java | 25 ++++++++++++++++-
.../flink/execution/SinkExecuteProcessor.java | 5 ++--
.../FlinkAbstractPluginExecuteProcessor.java | 32 ++++++++++++++++++++++
.../flink/execution/FlinkRuntimeEnvironment.java | 25 +++++++++++++++++
.../flink/execution/SinkExecuteProcessor.java | 5 ++--
.../flink/execution/SourceExecuteProcessor.java | 4 +++
.../flink/execution/TransformExecuteProcessor.java | 14 ++++++----
.../src/test/resources/fake_to_paimon.conf | 1 +
.../src/test/resources/paimon_to_assert.conf | 2 +-
.../flink/utils/TypeConverterUtilsTest.java | 11 ++++++--
.../flink/serialization/FlinkRowConverter.java | 24 ++++++++++++++++
.../flink/utils/TypeConverterUtils.java | 21 ++++++--------
13 files changed, 141 insertions(+), 29 deletions(-)
diff --git a/release-note.md b/release-note.md
index 27f57f4ea7..f71a24636a 100644
--- a/release-note.md
+++ b/release-note.md
@@ -148,6 +148,7 @@
- [Core] [API] Add copy method to Catalog codes (#4414)
- [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424)
- [Core] [Shade] Add guava shade module (#4358)
+- [Core] [Flink] Support Decimal Type with configurable precision and scale (#5419)
### Connector-V2
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 996c9698fb..4bd81769fb 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
@@ -51,8 +52,11 @@ import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -64,7 +68,8 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
private StreamExecutionEnvironment environment;
private StreamTableEnvironment tableEnvironment;
-
+ private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
+ private Optional<SeaTunnelRowType> defaultType = Optional.empty();
private JobMode jobMode;
private String jobName = Constants.LOGO;
@@ -334,6 +339,24 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
name, tableEnvironment.fromChangelogStream(dataStream));
}
+ public void stageType(String tblName, SeaTunnelRowType type) {
+ stagedTypes.put(tblName, type);
+ }
+
+ public void stageDefaultType(SeaTunnelRowType type) {
+ this.defaultType = Optional.of(type);
+ }
+
+ public Optional<SeaTunnelRowType> type(String tblName) {
+ return stagedTypes.containsKey(tblName)
+ ? Optional.of(stagedTypes.get(tblName))
+ : Optional.empty();
+ }
+
+ public Optional<SeaTunnelRowType> defaultType() {
+ return this.defaultType;
+ }
+
public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 03bd2077e5..cf0ad1e7be 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -101,8 +100,8 @@ public class SinkExecuteProcessor
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- seaTunnelSink.setTypeInfo(
- (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
+ SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
+ seaTunnelSink.setTypeInfo(sourceType);
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 6c61f61b95..ed8b72a8f0 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -20,9 +20,11 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -117,6 +119,36 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
}
}
+ protected void stageType(Config pluginConfig, SeaTunnelRowType type) {
+ if (!flinkRuntimeEnvironment.defaultType().isPresent()) {
+ flinkRuntimeEnvironment.stageDefaultType(type);
+ }
+
+ if (pluginConfig.hasPath("result_table_name")) {
+ String tblName = pluginConfig.getString("result_table_name");
+ flinkRuntimeEnvironment.stageType(tblName, type);
+ }
+ }
+
+ protected Optional<SeaTunnelRowType> sourceType(Config pluginConfig) {
+ if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+ String tblName = pluginConfig.getString(SOURCE_TABLE_NAME);
+ return flinkRuntimeEnvironment.type(tblName);
+ } else {
+ return flinkRuntimeEnvironment.defaultType();
+ }
+ }
+
+ protected SeaTunnelRowType initSourceType(Config sinkConfig, DataStream<Row> stream) {
+ SeaTunnelRowType sourceType =
+ sourceType(sinkConfig)
+ .orElseGet(
+ () ->
+ (SeaTunnelRowType)
+ TypeConverterUtils.convert(stream.getType()));
+ return sourceType;
+ }
+
protected abstract List<T> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs);
}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 12168921d8..d8ff813f12 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
@@ -51,8 +52,11 @@ import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -65,6 +69,9 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
private StreamTableEnvironment tableEnvironment;
+ private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
+ private Optional<SeaTunnelRowType> defaultType = Optional.empty();
+
private JobMode jobMode;
private String jobName = Constants.LOGO;
@@ -334,6 +341,24 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
name, tableEnvironment.fromChangelogStream(dataStream));
}
+ public void stageType(String tblName, SeaTunnelRowType type) {
+ stagedTypes.put(tblName, type);
+ }
+
+ public void stageDefaultType(SeaTunnelRowType type) {
+ this.defaultType = Optional.of(type);
+ }
+
+ public Optional<SeaTunnelRowType> type(String tblName) {
+ return stagedTypes.containsKey(tblName)
+ ? Optional.of(stagedTypes.get(tblName))
+ : Optional.empty();
+ }
+
+ public Optional<SeaTunnelRowType> defaultType() {
+ return this.defaultType;
+ }
+
public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index ca9a05f632..340351d1d4 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -102,8 +101,8 @@ public class SinkExecuteProcessor
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- seaTunnelSink.setTypeInfo(
- (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
+ SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
+ seaTunnelSink.setTypeInfo(sourceType);
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index f3ebdd0437..d74726a133 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -76,12 +77,15 @@ public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<
boolean bounded =
internalSource.getBoundedness()
== org.apache.seatunnel.api.source.Boundedness.BOUNDED;
+
DataStreamSource<Row> sourceStream =
addSource(
executionEnvironment,
sourceFunction,
"SeaTunnel " + internalSource.getClass().getSimpleName(),
bounded);
+ stageType(pluginConfig, (SeaTunnelRowType) internalSource.getProducedType());
+
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index a358fb6f33..0dc36c62b0 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -97,7 +97,10 @@ public class TransformExecuteProcessor
SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
- input = flinkTransform(transform, stream);
+ SeaTunnelRowType sourceType = initSourceType(pluginConfig, stream);
+ transform.setTypeInfo(sourceType);
+ input = flinkTransform(sourceType, transform, stream);
+ stageType(pluginConfig, (SeaTunnelRowType) transform.getProducedType());
registerResultTable(pluginConfig, input);
result.add(input);
} catch (Exception e) {
@@ -111,11 +114,10 @@ public class TransformExecuteProcessor
return result;
}
- protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {
- SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType());
- transform.setTypeInfo(seaTunnelDataType);
+ protected DataStream<Row> flinkTransform(
+ SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
- FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType);
+ FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
FlinkRowConverter transformOutputRowConverter =
new FlinkRowConverter(transform.getProducedType());
DataStream<Row> output =
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
index 8afd81989c..8e5f00ee7b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
@@ -27,6 +27,7 @@ env {
source {
FakeSource {
+ row.num = 100000
schema = {
fields {
c_map = "map<string, string>"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index d27893cedc..cbd39a0fb0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -44,7 +44,7 @@ sink {
row_rules = [
{
rule_type = MAX_ROW
- rule_value = 5
+ rule_value = 100000
}
],
field_rules = [
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
index d180af69aa..95cfa335e7 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
@@ -83,9 +83,14 @@ public class TypeConverterUtilsTest {
@Test
public void convertBigDecimalType() {
- Assertions.assertEquals(
- BasicTypeInfo.BIG_DEC_TYPE_INFO,
- TypeConverterUtils.convert(new DecimalType(30, 2)));
+ /**
+ * To solve lost precision and scale of {@link
+ * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
+ * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the convert
+ * result of {@link org.apache.seatunnel.api.table.type.DecimalType} instance.
+ */
+ Assertions.assertEquals(
+ BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new DecimalType(30, 2)));
}
@Test
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
index fa8d88e052..a1278cdf85 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.translation.flink.serialization;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,6 +29,8 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
@@ -68,6 +71,15 @@ public class FlinkRowConverter extends RowConverter<Row> {
case MAP:
return convertMap(
(Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::convert);
+
+ /**
+ * To solve lost precision and scale of {@link
+ * org.apache.seatunnel.api.table.type.DecimalType}, use {@link java.lang.String} as
+ * the convert result of {@link java.math.BigDecimal} instance.
+ */
+ case DECIMAL:
+ BigDecimal decimal = (BigDecimal) field;
+ return decimal.toString();
default:
return field;
}
@@ -122,6 +134,18 @@ public class FlinkRowConverter extends RowConverter<Row> {
case MAP:
return convertMap(
(Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::reconvert);
+
+ /**
+ * To solve lost precision and scale of {@link
+ * org.apache.seatunnel.api.table.type.DecimalType}, create {@link
+ * java.math.BigDecimal} instance from {@link java.lang.String} type field.
+ */
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ String decimalData = (String) field;
+ BigDecimal decimal = new BigDecimal(decimalData);
+ decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP);
+ return decimal;
default:
return field;
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index fc8b4f6b3c..86fbd0833d 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
import java.math.BigDecimal;
import java.time.LocalDate;
@@ -70,11 +69,15 @@ public class TypeConverterUtils {
BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
BRIDGED_TYPES.put(
Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
- // TODO: there is a still an unresolved issue that the BigDecimal type will lose the
- // precision and scale
+ /**
+ * To solve lost precision and scale of {@link
+ * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
+ * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the payload of
+ * {@link org.apache.seatunnel.api.table.type.DecimalType}.
+ */
BRIDGED_TYPES.put(
BigDecimal.class,
- BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
+ BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.STRING_TYPE_INFO));
// data time types
BRIDGED_TYPES.put(
@@ -134,10 +137,7 @@ public class TypeConverterUtils {
if (bridgedType != null) {
return bridgedType.getSeaTunnelType();
}
- if (dataType instanceof BigDecimalTypeInfo) {
- BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) dataType;
- return new DecimalType(decimalType.precision(), decimalType.scale());
- }
+
if (dataType instanceof MapTypeInfo) {
MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
return new MapType<>(
@@ -160,10 +160,7 @@ public class TypeConverterUtils {
if (bridgedType != null) {
return bridgedType.getFlinkType();
}
- if (dataType instanceof DecimalType) {
- DecimalType decimalType = (DecimalType) dataType;
- return new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
- }
+
if (dataType instanceof MapType) {
MapType<?, ?> mapType = (MapType<?, ?>) dataType;
return new MapTypeInfo<>(