You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/09/25 04:14:04 UTC

[seatunnel] branch dev updated: [Feature][Flink] Support Decimal Type with configurable precision and scale (#5419)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7348be2a7a [Feature][Flink] Support Decimal Type with configurable precision and scale (#5419)
7348be2a7a is described below

commit 7348be2a7a530704cfaffd77e4bf552639618f31
Author: Chengyu Yan <ch...@hotmail.com>
AuthorDate: Mon Sep 25 12:13:58 2023 +0800

    [Feature][Flink] Support Decimal Type with configurable precision and scale (#5419)
---
 release-note.md                                    |  1 +
 .../flink/execution/FlinkRuntimeEnvironment.java   | 25 ++++++++++++++++-
 .../flink/execution/SinkExecuteProcessor.java      |  5 ++--
 .../FlinkAbstractPluginExecuteProcessor.java       | 32 ++++++++++++++++++++++
 .../flink/execution/FlinkRuntimeEnvironment.java   | 25 +++++++++++++++++
 .../flink/execution/SinkExecuteProcessor.java      |  5 ++--
 .../flink/execution/SourceExecuteProcessor.java    |  4 +++
 .../flink/execution/TransformExecuteProcessor.java | 14 ++++++----
 .../src/test/resources/fake_to_paimon.conf         |  1 +
 .../src/test/resources/paimon_to_assert.conf       |  2 +-
 .../flink/utils/TypeConverterUtilsTest.java        | 11 ++++++--
 .../flink/serialization/FlinkRowConverter.java     | 24 ++++++++++++++++
 .../flink/utils/TypeConverterUtils.java            | 21 ++++++--------
 13 files changed, 141 insertions(+), 29 deletions(-)

diff --git a/release-note.md b/release-note.md
index 27f57f4ea7..f71a24636a 100644
--- a/release-note.md
+++ b/release-note.md
@@ -148,6 +148,7 @@
 - [Core] [API] Add copy method to Catalog codes (#4414)
 - [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424)
 - [Core] [Shade] Add guava shade module (#4358)
+- [Core] [Flink] Support Decimal Type with configurable precision and scale (#5419)
 
 ### Connector-V2
 
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 996c9698fb..4bd81769fb 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -51,8 +52,11 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -64,7 +68,8 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
     private StreamExecutionEnvironment environment;
 
     private StreamTableEnvironment tableEnvironment;
-
+    private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
+    private Optional<SeaTunnelRowType> defaultType = Optional.empty();
     private JobMode jobMode;
 
     private String jobName = Constants.LOGO;
@@ -334,6 +339,24 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
                 name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
+    public void stageType(String tblName, SeaTunnelRowType type) {
+        stagedTypes.put(tblName, type);
+    }
+
+    public void stageDefaultType(SeaTunnelRowType type) {
+        this.defaultType = Optional.of(type);
+    }
+
+    public Optional<SeaTunnelRowType> type(String tblName) {
+        return stagedTypes.containsKey(tblName)
+                ? Optional.of(stagedTypes.get(tblName))
+                : Optional.empty();
+    }
+
+    public Optional<SeaTunnelRowType> defaultType() {
+        return this.defaultType;
+    }
+
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 03bd2077e5..cf0ad1e7be 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -101,8 +100,8 @@ public class SinkExecuteProcessor
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
                     plugins.get(i);
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            seaTunnelSink.setTypeInfo(
-                    (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
+            SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
+            seaTunnelSink.setTypeInfo(sourceType);
             if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                 SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
                 DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 6c61f61b95..ed8b72a8f0 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -20,9 +20,11 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
@@ -117,6 +119,36 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
         }
     }
 
+    protected void stageType(Config pluginConfig, SeaTunnelRowType type) {
+        if (!flinkRuntimeEnvironment.defaultType().isPresent()) {
+            flinkRuntimeEnvironment.stageDefaultType(type);
+        }
+
+        if (pluginConfig.hasPath("result_table_name")) {
+            String tblName = pluginConfig.getString("result_table_name");
+            flinkRuntimeEnvironment.stageType(tblName, type);
+        }
+    }
+
+    protected Optional<SeaTunnelRowType> sourceType(Config pluginConfig) {
+        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+            String tblName = pluginConfig.getString(SOURCE_TABLE_NAME);
+            return flinkRuntimeEnvironment.type(tblName);
+        } else {
+            return flinkRuntimeEnvironment.defaultType();
+        }
+    }
+
+    protected SeaTunnelRowType initSourceType(Config sinkConfig, DataStream<Row> stream) {
+        SeaTunnelRowType sourceType =
+                sourceType(sinkConfig)
+                        .orElseGet(
+                                () ->
+                                        (SeaTunnelRowType)
+                                                TypeConverterUtils.convert(stream.getType()));
+        return sourceType;
+    }
+
     protected abstract List<T> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs);
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 12168921d8..d8ff813f12 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -51,8 +52,11 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -65,6 +69,9 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
 
     private StreamTableEnvironment tableEnvironment;
 
+    private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
+    private Optional<SeaTunnelRowType> defaultType = Optional.empty();
+
     private JobMode jobMode;
 
     private String jobName = Constants.LOGO;
@@ -334,6 +341,24 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
                 name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
+    public void stageType(String tblName, SeaTunnelRowType type) {
+        stagedTypes.put(tblName, type);
+    }
+
+    public void stageDefaultType(SeaTunnelRowType type) {
+        this.defaultType = Optional.of(type);
+    }
+
+    public Optional<SeaTunnelRowType> type(String tblName) {
+        return stagedTypes.containsKey(tblName)
+                ? Optional.of(stagedTypes.get(tblName))
+                : Optional.empty();
+    }
+
+    public Optional<SeaTunnelRowType> defaultType() {
+        return this.defaultType;
+    }
+
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index ca9a05f632..340351d1d4 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -102,8 +101,8 @@ public class SinkExecuteProcessor
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
                     plugins.get(i);
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            seaTunnelSink.setTypeInfo(
-                    (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
+            SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
+            seaTunnelSink.setTypeInfo(sourceType);
             if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                 SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
                 DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index f3ebdd0437..d74726a133 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -76,12 +77,15 @@ public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<
             boolean bounded =
                     internalSource.getBoundedness()
                             == org.apache.seatunnel.api.source.Boundedness.BOUNDED;
+
             DataStreamSource<Row> sourceStream =
                     addSource(
                             executionEnvironment,
                             sourceFunction,
                             "SeaTunnel " + internalSource.getClass().getSimpleName(),
                             bounded);
+            stageType(pluginConfig, (SeaTunnelRowType) internalSource.getProducedType());
+
             if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index a358fb6f33..0dc36c62b0 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -97,7 +97,10 @@ public class TransformExecuteProcessor
                 SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
                 Config pluginConfig = pluginConfigs.get(i);
                 DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
-                input = flinkTransform(transform, stream);
+                SeaTunnelRowType sourceType = initSourceType(pluginConfig, stream);
+                transform.setTypeInfo(sourceType);
+                input = flinkTransform(sourceType, transform, stream);
+                stageType(pluginConfig, (SeaTunnelRowType) transform.getProducedType());
                 registerResultTable(pluginConfig, input);
                 result.add(input);
             } catch (Exception e) {
@@ -111,11 +114,10 @@ public class TransformExecuteProcessor
         return result;
     }
 
-    protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {
-        SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType());
-        transform.setTypeInfo(seaTunnelDataType);
+    protected DataStream<Row> flinkTransform(
+            SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
         TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
-        FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType);
+        FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
         FlinkRowConverter transformOutputRowConverter =
                 new FlinkRowConverter(transform.getProducedType());
         DataStream<Row> output =
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
index 8afd81989c..8e5f00ee7b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
@@ -27,6 +27,7 @@ env {
 
 source {
   FakeSource {
+    row.num = 100000
     schema = {
       fields {
         c_map = "map<string, string>"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index d27893cedc..cbd39a0fb0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -44,7 +44,7 @@ sink {
       row_rules = [
         {
           rule_type = MAX_ROW
-          rule_value = 5
+          rule_value = 100000
         }
       ],
       field_rules = [
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
index d180af69aa..95cfa335e7 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
@@ -83,9 +83,14 @@ public class TypeConverterUtilsTest {
 
     @Test
     public void convertBigDecimalType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.BIG_DEC_TYPE_INFO,
-                TypeConverterUtils.convert(new DecimalType(30, 2)));
+        /**
+         * To solve lost precision and scale of {@link
+         * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
+         * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the convert
+         * result of {@link org.apache.seatunnel.api.table.type.DecimalType} instance.
+         */
+        Assertions.assertEquals(
+                BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new DecimalType(30, 2)));
     }
 
     @Test
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
index fa8d88e052..a1278cdf85 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.translation.flink.serialization;
 
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,6 +29,8 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.BiFunction;
@@ -68,6 +71,15 @@ public class FlinkRowConverter extends RowConverter<Row> {
             case MAP:
                 return convertMap(
                         (Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::convert);
+
+                /**
+                 * To solve lost precision and scale of {@link
+                 * org.apache.seatunnel.api.table.type.DecimalType}, use {@link java.lang.String} as
+                 * the convert result of {@link java.math.BigDecimal} instance.
+                 */
+            case DECIMAL:
+                BigDecimal decimal = (BigDecimal) field;
+                return decimal.toString();
             default:
                 return field;
         }
@@ -122,6 +134,18 @@ public class FlinkRowConverter extends RowConverter<Row> {
             case MAP:
                 return convertMap(
                         (Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::reconvert);
+
+                /**
+                 * To solve lost precision and scale of {@link
+                 * org.apache.seatunnel.api.table.type.DecimalType}, create {@link
+                 * java.math.BigDecimal} instance from {@link java.lang.String} type field.
+                 */
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) dataType;
+                String decimalData = (String) field;
+                BigDecimal decimal = new BigDecimal(decimalData);
+                decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP);
+                return decimal;
             default:
                 return field;
         }
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index fc8b4f6b3c..86fbd0833d 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
@@ -70,11 +69,15 @@ public class TypeConverterUtils {
                 BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
         BRIDGED_TYPES.put(
                 Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
-        // TODO: there is a still an unresolved issue that the BigDecimal type will lose the
-        // precision and scale
+        /**
+         * To solve lost precision and scale of {@link
+         * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
+         * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the payload of
+         * {@link org.apache.seatunnel.api.table.type.DecimalType}.
+         */
         BRIDGED_TYPES.put(
                 BigDecimal.class,
-                BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
+                BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.STRING_TYPE_INFO));
 
         // data time types
         BRIDGED_TYPES.put(
@@ -134,10 +137,7 @@ public class TypeConverterUtils {
         if (bridgedType != null) {
             return bridgedType.getSeaTunnelType();
         }
-        if (dataType instanceof BigDecimalTypeInfo) {
-            BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) dataType;
-            return new DecimalType(decimalType.precision(), decimalType.scale());
-        }
+
         if (dataType instanceof MapTypeInfo) {
             MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
             return new MapType<>(
@@ -160,10 +160,7 @@ public class TypeConverterUtils {
         if (bridgedType != null) {
             return bridgedType.getFlinkType();
         }
-        if (dataType instanceof DecimalType) {
-            DecimalType decimalType = (DecimalType) dataType;
-            return new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
-        }
+
         if (dataType instanceof MapType) {
             MapType<?, ?> mapType = (MapType<?, ?>) dataType;
             return new MapTypeInfo<>(