You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:44 UTC

[inlong] 02/07: [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 19ba51d4a9d609b520c4b18f7cab14024c74894f
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Nov 8 16:00:05 2022 +0800

    [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../org/apache/inlong/sort/base/Constants.java     |   7 +
 .../base/format/AbstractDynamicSchemaFormat.java   |   7 -
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  25 +-
 .../format/DebeziumJsonDynamicSchemaFormat.java    |  24 +-
 .../base/format/DynamicSchemaFormatFactory.java    |  39 +-
 .../sort/base/format/JsonDynamicSchemaFormat.java  |  50 ++-
 .../sort/base/format/JsonToRowDataConverters.java  | 414 +++++++++++++++++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  |  24 +-
 .../format/CanalJsonDynamicSchemaFormatTest.java   |  12 +-
 .../DebeziumJsonDynamicSchemaFormatTest.java       |   3 +-
 ...eziumJsonDynamicSchemaFormatWithSchemaTest.java |   3 +-
 .../sort/doris/table/DorisDynamicTableFactory.java |   6 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java     |   2 +
 .../inlong/sort/iceberg/IcebergTableSink.java      |   2 +
 .../sink/multiple/DynamicSchemaHandleOperator.java |   4 +-
 .../sort/kafka/table/KafkaDynamicTableFactory.java |   5 +-
 16 files changed, 539 insertions(+), 88 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19c58e9c1..9cc3c979d 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -163,4 +163,11 @@ public final class Constants {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
+
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
+            ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+                            + "`time`, so type conversions must be mapped to types supported by spark.");
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
index 2def7bb4b..f32b0086e 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
@@ -187,11 +187,4 @@ public abstract class AbstractDynamicSchemaFormat<T> {
      * @throws IOException The exception will throws
      */
     public abstract String parse(T data, String pattern) throws IOException;
-
-    /**
-     * Get the identifier of this dynamic schema format
-     *
-     * @return The identifier of this dynamic schema format
-     */
-    public abstract String identifier();
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index 88e51df7b..81256eda1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -17,22 +17,22 @@
 
 package org.apache.inlong.sort.base.format;
 
-import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Canal json dynamic format
  */
 public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
-    private static final String IDENTIFIER = "canal-json";
     private static final String DDL_FLAG = "ddl";
     private static final String DATA = "data";
     private static final String OLD = "old";
@@ -43,15 +43,8 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
     private static final String OP_UPDATE = "UPDATE";
     private static final String OP_DELETE = "DELETE";
 
-    private static final CanalJsonDynamicSchemaFormat FORMAT = new CanalJsonDynamicSchemaFormat();
-
-    private CanalJsonDynamicSchemaFormat() {
-
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static AbstractDynamicSchemaFormat getInstance() {
-        return FORMAT;
+    protected CanalJsonDynamicSchemaFormat(Map<String, String> props) {
+        super(props);
     }
 
     @Override
@@ -170,14 +163,4 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
         return rowDataList;
     }
-
-    /**
-     * Get the identifier of this dynamic schema format
-     *
-     * @return The identifier of this dynamic schema format
-     */
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index 9841d01f6..d535b87d1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.base.format;
 
-import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.RowData;
@@ -33,6 +32,7 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -44,7 +44,6 @@ import java.util.Map;
  */
 public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
-    private static final String IDENTIFIER = "debezium-json";
     private static final String DDL_FLAG = "ddl";
     private static final String SCHEMA = "schema";
     private static final String SQL_TYPE = "sqlType";
@@ -87,15 +86,8 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
                     .put("BYTES", new VarBinaryType())
                     .build();
 
-    private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
-
-    private DebeziumJsonDynamicSchemaFormat() {
-
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static AbstractDynamicSchemaFormat getInstance() {
-        return FORMAT;
+    protected DebeziumJsonDynamicSchemaFormat(Map<String, String> props) {
+        super(props);
     }
 
     @Override
@@ -289,16 +281,6 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
         return extractRowData(payload, rowType);
     }
 
-    /**
-     * Get the identifier of this dynamic schema format
-     *
-     * @return The identifier of this dynamic schema format
-     */
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
-
     private LogicalType debeziumType2FlinkType(String debeziumType) {
         if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) {
             return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase());
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
index 65a5b5e78..c260bdeaa 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
@@ -17,26 +17,36 @@
 
 package org.apache.inlong.sort.base.format;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
 
 /**
  * Dynamic schema format factory
  */
 public class DynamicSchemaFormatFactory {
 
-    public static final List<AbstractDynamicSchemaFormat<?>> SUPPORT_FORMATS =
-            new ArrayList<AbstractDynamicSchemaFormat<?>>() {
+    public static Map<String, Function<Map<String, String>, AbstractDynamicSchemaFormat>> SUPPORT_FORMATS =
+            ImmutableMap.of(
+                    "canal-json", props -> new CanalJsonDynamicSchemaFormat(props),
+                    "debezium-json", props -> new DebeziumJsonDynamicSchemaFormat(props)
+            );
 
-                private static final long serialVersionUID = 1L;
+    /**
+     * Get format from the format name, it only supports [canal-json|debezium-json] for now
+     *
+     * @param identifier The identifier of this format
+     * @return The dynamic format
+     */
+    @SuppressWarnings("rawtypes")
+    public static AbstractDynamicSchemaFormat getFormat(String identifier) {
+        return getFormat(identifier, new HashMap<>());
+    }
 
-                {
-                    add(CanalJsonDynamicSchemaFormat.getInstance());
-                    add(DebeziumJsonDynamicSchemaFormat.getInstance());
-                }
-            };
 
     /**
      * Get format from the format name, it only supports [canal-json|debezium-json] for now
@@ -45,10 +55,11 @@ public class DynamicSchemaFormatFactory {
      * @return The dynamic format
      */
     @SuppressWarnings("rawtypes")
-    public static AbstractDynamicSchemaFormat getFormat(String identifier) {
+    public static AbstractDynamicSchemaFormat getFormat(String identifier, Map<String, String> properties) {
         Preconditions.checkNotNull(identifier, "The identifier is null");
-        return Preconditions.checkNotNull(SUPPORT_FORMATS.stream().filter(s -> s.identifier().equals(identifier))
-                .findFirst().orElse(null), "Unsupport dynamic schema format for:" + identifier);
+        return Optional.ofNullable(SUPPORT_FORMATS.get(identifier))
+                .orElseThrow(() ->
+                        new UnsupportedOperationException("Unsupport dynamic schema format for:" + identifier))
+                .apply(properties);
     }
-
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index a0d564d0c..e6fc75528 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,8 +17,9 @@
 
 package org.apache.inlong.sort.base.format;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonToRowDataConverters;
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -51,6 +52,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
 
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
+
 /**
  * Json dynamic format class
  * This class main handle:
@@ -64,6 +67,7 @@ import java.util.regex.Matcher;
  * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
  * the result of pared will be 'prefix_1_2_3_suffix'
  */
+@SuppressWarnings("LanguageDetectionInspection")
 public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
 
     /**
@@ -71,7 +75,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
      */
     private static final Integer FIRST = 0;
 
-    private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING =
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
                     .put(java.sql.Types.CHAR, new CharType())
                     .put(java.sql.Types.VARCHAR, new VarCharType())
@@ -94,12 +98,44 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
                     .put(java.sql.Types.BOOLEAN, new BooleanType())
                     .put(java.sql.Types.OTHER, new VarCharType())
                     .build();
+
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
+            ImmutableMap.<Integer, LogicalType>builder()
+                    .put(java.sql.Types.CHAR, new CharType())
+                    .put(java.sql.Types.VARCHAR, new VarCharType())
+                    .put(java.sql.Types.SMALLINT, new SmallIntType())
+                    .put(java.sql.Types.INTEGER, new IntType())
+                    .put(java.sql.Types.BIGINT, new BigIntType())
+                    .put(java.sql.Types.REAL, new FloatType())
+                    .put(java.sql.Types.DOUBLE, new DoubleType())
+                    .put(java.sql.Types.FLOAT, new FloatType())
+                    .put(java.sql.Types.DECIMAL, new DecimalType())
+                    .put(java.sql.Types.NUMERIC, new DecimalType())
+                    .put(java.sql.Types.BIT, new BooleanType())
+                    .put(java.sql.Types.TIME, new VarCharType())
+                    .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new LocalZonedTimestampType())
+                    .put(java.sql.Types.TIMESTAMP, new LocalZonedTimestampType())
+                    .put(java.sql.Types.BINARY, new BinaryType())
+                    .put(java.sql.Types.VARBINARY, new VarBinaryType())
+                    .put(java.sql.Types.BLOB, new VarBinaryType())
+                    .put(java.sql.Types.DATE, new DateType())
+                    .put(java.sql.Types.BOOLEAN, new BooleanType())
+                    .put(java.sql.Types.OTHER, new VarCharType())
+                    .build();
+
     public final ObjectMapper objectMapper = new ObjectMapper();
     protected final JsonToRowDataConverters rowDataConverters;
+    protected final boolean adaptSparkEngine;
 
-    protected JsonDynamicSchemaFormat() {
+    public JsonDynamicSchemaFormat(Map<String, String> properties) {
+        ReadableConfig config = Configuration.fromMap(properties);
+        this.adaptSparkEngine = config.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
         this.rowDataConverters =
-                new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601);
+                new JsonToRowDataConverters(
+                        false,
+                        false,
+                        TimestampFormat.ISO_8601,
+                        adaptSparkEngine);
     }
 
     /**
@@ -286,8 +322,10 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
     }
 
     private LogicalType sqlType2FlinkType(int jdbcType) {
-        if (SQL_TYPE_2_ICEBERG_TYPE_MAPPING.containsKey(jdbcType)) {
-            return SQL_TYPE_2_ICEBERG_TYPE_MAPPING.get(jdbcType);
+        Map<Integer, LogicalType> typeMap = adaptSparkEngine
+                ? SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING : SQL_TYPE_2_FLINK_TYPE_MAPPING;
+        if (typeMap.containsKey(jdbcType)) {
+            return typeMap.get(jdbcType);
         } else {
             throw new IllegalArgumentException("Unsupported jdbcType: " + jdbcType);
         }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
new file mode 100644
index 000000000..da6128942
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
@@ -0,0 +1,414 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
+
+/** Tool class used to convert from {@link JsonNode} to {@link RowData}. * */
+@Internal
+public class JsonToRowDataConverters implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Flag indicating whether to fail if a field is missing. */
+    private final boolean failOnMissingField;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+    private final boolean ignoreParseErrors;
+
+    /** Timestamp format specification which is used to parse timestamp. */
+    private final TimestampFormat timestampFormat;
+
+    /** Wherther adapt spark sql program. */
+    private final boolean adaptSpark;
+
+    public JsonToRowDataConverters(
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat,
+            boolean adaptSpark) {
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.adaptSpark = adaptSpark;
+    }
+
+    /**
+     * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface JsonToRowDataConverter extends Serializable {
+        Object convert(JsonNode jsonNode);
+    }
+
+    /** Creates a runtime converter which is null safe. */
+    public JsonToRowDataConverter createConverter(LogicalType type) {
+        return wrapIntoNullableConverter(createNotNullConverter(type));
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private JsonToRowDataConverter createNotNullConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return jsonNode -> null;
+            case BOOLEAN:
+                return this::convertToBoolean;
+            case TINYINT:
+                return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+            case SMALLINT:
+                return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return this::convertToInt;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return this::convertToLong;
+            case DATE:
+                return this::convertToDate;
+            case TIME_WITHOUT_TIME_ZONE:
+                return this::convertToTime;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return this::convertToTimestamp;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (adaptSpark) {
+                  return  jsonNode -> {
+                      try {
+                          return convertToTimestampWithLocalZone(jsonNode);
+                      } catch (DateTimeParseException e) {
+                          return convertToTimestamp(jsonNode);
+                      }
+                  };
+                }
+                return this::convertToTimestampWithLocalZone;
+            case FLOAT:
+                return this::convertToFloat;
+            case DOUBLE:
+                return this::convertToDouble;
+            case CHAR:
+            case VARCHAR:
+                return this::convertToString;
+            case BINARY:
+            case VARBINARY:
+                return this::convertToBytes;
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                MapType mapType = (MapType) type;
+                return createMapConverter(
+                        mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
+            case MULTISET:
+                MultisetType multisetType = (MultisetType) type;
+                return createMapConverter(
+                        multisetType.asSummaryString(),
+                        multisetType.getElementType(),
+                        new IntType());
+            case ROW:
+                return createRowConverter((RowType) type);
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private boolean convertToBoolean(JsonNode jsonNode) {
+        if (jsonNode.isBoolean()) {
+            // avoid redundant toString and parseBoolean, for better performance
+            return jsonNode.asBoolean();
+        } else {
+            return Boolean.parseBoolean(jsonNode.asText().trim());
+        }
+    }
+
+    private int convertToInt(JsonNode jsonNode) {
+        if (jsonNode.canConvertToInt()) {
+            // avoid redundant toString and parseInt, for better performance
+            return jsonNode.asInt();
+        } else {
+            return Integer.parseInt(jsonNode.asText().trim());
+        }
+    }
+
+    private long convertToLong(JsonNode jsonNode) {
+        if (jsonNode.canConvertToLong()) {
+            // avoid redundant toString and parseLong, for better performance
+            return jsonNode.asLong();
+        } else {
+            return Long.parseLong(jsonNode.asText().trim());
+        }
+    }
+
+    private double convertToDouble(JsonNode jsonNode) {
+        if (jsonNode.isDouble()) {
+            // avoid redundant toString and parseDouble, for better performance
+            return jsonNode.asDouble();
+        } else {
+            return Double.parseDouble(jsonNode.asText().trim());
+        }
+    }
+
+    private float convertToFloat(JsonNode jsonNode) {
+        if (jsonNode.isDouble()) {
+            // avoid redundant toString and parseDouble, for better performance
+            return (float) jsonNode.asDouble();
+        } else {
+            return Float.parseFloat(jsonNode.asText().trim());
+        }
+    }
+
+    private int convertToDate(JsonNode jsonNode) {
+        LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+        return (int) date.toEpochDay();
+    }
+
+    private int convertToTime(JsonNode jsonNode) {
+        TemporalAccessor parsedTime = SQL_TIME_FORMAT.parse(jsonNode.asText());
+        LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+        // get number of milliseconds of the day
+        return localTime.toSecondOfDay() * 1000;
+    }
+
+    private TimestampData convertToTimestamp(JsonNode jsonNode) {
+        TemporalAccessor parsedTimestamp;
+        switch (timestampFormat) {
+            case SQL:
+                parsedTimestamp = SQL_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+                break;
+            case ISO_8601:
+                parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+                break;
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported timestamp format '%s'. Validator should have checked that.",
+                                timestampFormat));
+        }
+        LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+        LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+
+        return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
+    }
+
+    private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode) {
+        TemporalAccessor parsedTimestampWithLocalZone;
+        switch (timestampFormat) {
+            case SQL:
+                parsedTimestampWithLocalZone =
+                        SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+                break;
+            case ISO_8601:
+                parsedTimestampWithLocalZone =
+                        ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+                break;
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported timestamp format '%s'. Validator should have checked that.",
+                                timestampFormat));
+        }
+        LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+        LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+
+        return TimestampData.fromInstant(
+                LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC));
+    }
+
+    private StringData convertToString(JsonNode jsonNode) {
+        if (jsonNode.isContainerNode()) {
+            return StringData.fromString(jsonNode.toString());
+        } else {
+            return StringData.fromString(jsonNode.asText());
+        }
+    }
+
+    private byte[] convertToBytes(JsonNode jsonNode) {
+        try {
+            return jsonNode.binaryValue();
+        } catch (IOException e) {
+            throw new JsonParseException("Unable to deserialize byte array.", e);
+        }
+    }
+
+    private JsonToRowDataConverter createDecimalConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return jsonNode -> {
+            BigDecimal bigDecimal;
+            if (jsonNode.isBigDecimal()) {
+                bigDecimal = jsonNode.decimalValue();
+            } else {
+                bigDecimal = new BigDecimal(jsonNode.asText());
+            }
+            return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+        };
+    }
+
+    private JsonToRowDataConverter createArrayConverter(ArrayType arrayType) {
+        JsonToRowDataConverter elementConverter = createConverter(arrayType.getElementType());
+        final Class<?> elementClass =
+                LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+        return jsonNode -> {
+            final ArrayNode node = (ArrayNode) jsonNode;
+            final Object[] array = (Object[]) Array.newInstance(elementClass, node.size());
+            for (int i = 0; i < node.size(); i++) {
+                final JsonNode innerNode = node.get(i);
+                array[i] = elementConverter.convert(innerNode);
+            }
+            return new GenericArrayData(array);
+        };
+    }
+
+    private JsonToRowDataConverter createMapConverter(
+            String typeSummary, LogicalType keyType, LogicalType valueType) {
+        if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+            throw new UnsupportedOperationException(
+                    "JSON format doesn't support non-string as key type of map. "
+                            + "The type is: "
+                            + typeSummary);
+        }
+        final JsonToRowDataConverter keyConverter = createConverter(keyType);
+        final JsonToRowDataConverter valueConverter = createConverter(valueType);
+
+        return jsonNode -> {
+            Iterator<Entry<String, JsonNode>> fields = jsonNode.fields();
+            Map<Object, Object> result = new HashMap<>();
+            while (fields.hasNext()) {
+                Map.Entry<String, JsonNode> entry = fields.next();
+                Object key = keyConverter.convert(TextNode.valueOf(entry.getKey()));
+                Object value = valueConverter.convert(entry.getValue());
+                result.put(key, value);
+            }
+            return new GenericMapData(result);
+        };
+    }
+
+    public JsonToRowDataConverter createRowConverter(RowType rowType) {
+        final JsonToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(this::createConverter)
+                        .toArray(JsonToRowDataConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+        return jsonNode -> {
+            ObjectNode node = (ObjectNode) jsonNode;
+            int arity = fieldNames.length;
+            GenericRowData row = new GenericRowData(arity);
+            for (int i = 0; i < arity; i++) {
+                String fieldName = fieldNames[i];
+                JsonNode field = node.get(fieldName);
+                Object convertedField = convertField(fieldConverters[i], fieldName, field);
+                row.setField(i, convertedField);
+            }
+            return row;
+        };
+    }
+
+    private Object convertField(
+            JsonToRowDataConverter fieldConverter, String fieldName, JsonNode field) {
+        if (field == null) {
+            if (failOnMissingField) {
+                throw new JsonParseException("Could not find field with name '" + fieldName + "'.");
+            } else {
+                return null;
+            }
+        } else {
+            return fieldConverter.convert(field);
+        }
+    }
+
+    private JsonToRowDataConverter wrapIntoNullableConverter(
+            JsonToRowDataConverter converter) {
+        return jsonNode -> {
+            if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+                return null;
+            }
+            try {
+                return converter.convert(jsonNode);
+            } catch (Throwable t) {
+                if (!ignoreParseErrors) {
+                    throw t;
+                }
+                return null;
+            }
+        };
+    }
+
+    /** Exception which refers to parse errors in converters. */
+    private static final class JsonParseException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+
+        public JsonParseException(String message) {
+            super(message);
+        }
+
+        public JsonParseException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 77c924b95..10bc3f3d1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -18,11 +18,14 @@
 
 package org.apache.inlong.sort.base.sink;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.Map;
 
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE;
 import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
 import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.TRY_IT_BEST;
@@ -37,6 +40,8 @@ public class MultipleSinkOption implements Serializable {
 
     private String format;
 
+    private boolean sparkEngineEnable;
+
     private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
 
     private String databasePattern;
@@ -44,10 +49,12 @@ public class MultipleSinkOption implements Serializable {
     private String tablePattern;
 
     public MultipleSinkOption(String format,
+            boolean sparkEngineEnable,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String databasePattern,
             String tablePattern) {
         this.format = format;
+        this.sparkEngineEnable = sparkEngineEnable;
         this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
@@ -57,6 +64,15 @@ public class MultipleSinkOption implements Serializable {
         return format;
     }
 
+    public boolean isSparkEngineEnable() {
+        return sparkEngineEnable;
+    }
+
+    public Map<String, String> getFormatOption() {
+        return ImmutableMap.of(
+                SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK.key(), String.valueOf(isSparkEngineEnable()));
+    }
+
     public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() {
         return schemaUpdatePolicy;
     }
@@ -75,6 +91,7 @@ public class MultipleSinkOption implements Serializable {
 
     public static class Builder {
         private String format;
+        private boolean sparkEngineEnable;
         private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private String databasePattern;
         private String tablePattern;
@@ -84,6 +101,11 @@ public class MultipleSinkOption implements Serializable {
             return this;
         }
 
+        public MultipleSinkOption.Builder withSparkEngineEnable(boolean sparkEngineEnable) {
+            this.sparkEngineEnable = sparkEngineEnable;
+            return this;
+        }
+
         public MultipleSinkOption.Builder withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
             this.schemaUpdatePolicy = schemaUpdatePolicy;
             return this;
@@ -100,7 +122,7 @@ public class MultipleSinkOption implements Serializable {
         }
 
         public MultipleSinkOption build() {
-            return new MultipleSinkOption(format, schemaUpdatePolicy, databasePattern, tablePattern);
+            return new MultipleSinkOption(format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index dc7ce5ec8..29dbcb6a1 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
  * Test for {@link CanalJsonDynamicSchemaFormat}
  */
 public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+    private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("canal-json");
 
     @Override
     protected String getSource() {
@@ -91,6 +92,11 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
         return expectedValues;
     }
 
+    @Override
+    protected AbstractDynamicSchemaFormat<JsonNode> getDynamicSchemaFormat() {
+        return schemaFormat;
+    }
+
     @Test
     @SuppressWarnings({"unchecked"})
     public void testExtractPrimaryKey() throws IOException {
@@ -119,10 +125,4 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
                 5.18f));
         Assert.assertEquals(values, rowDataList);
     }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
-        return CanalJsonDynamicSchemaFormat.getInstance();
-    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index 5e555a657..c0eaface9 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
  * Test for {@link DebeziumJsonDynamicSchemaFormat}
  */
 public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+    private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json");
 
     @Override
     protected String getSource() {
@@ -98,6 +99,6 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
-        return DebeziumJsonDynamicSchemaFormat.getInstance();
+        return schemaFormat;
     }
 }
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
index fb268b9a7..d4a3dd188 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
  * Test for {@link DebeziumJsonDynamicSchemaFormat}
  */
 public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+    private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json");
 
     @Override
     protected String getSource() {
@@ -270,6 +271,6 @@ public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchema
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
-        return DebeziumJsonDynamicSchemaFormat.getInstance();
+        return schemaFormat;
     }
 }
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index f3c263f93..30ef92b69 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -35,16 +35,13 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.utils.TableSchemaUtils;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 
 import java.time.Duration;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.stream.Collectors;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -332,8 +329,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
                                 + "is not allowed blank when the option 'sink.multiple.enable' is 'true'");
             }
             DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
-            List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map(
-                    AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList());
+            Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
             if (!supportFormats.contains(sinkMultipleFormat)) {
                 throw new ValidationException(String.format(
                         "Unsupported value '%s' for '%s'. "
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 4852bca33..040e73df6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -55,6 +55,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -241,6 +242,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ee7cf2c89..67ca5fdf2 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -49,6 +49,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -104,6 +105,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                     .multipleSink(tableOptions.get(SINK_MULTIPLE_ENABLE))
                     .multipleSinkOption(MultipleSinkOption.builder()
                             .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
+                            .withSparkEngineEnable(tableOptions.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK))
                             .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
                             .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
                             .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index a0a9092c6..378f040d9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -94,7 +94,9 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         this.catalog = catalogLoader.loadCatalog();
         this.asNamespaceCatalog =
                 catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
-        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
+                multipleSinkOption.getFormat(), multipleSinkOption.getFormatOption());
+
         this.processingTimeService = getRuntimeContext().getProcessingTimeService();
         processingTimeService.registerTimer(
                 processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 480d55948..69716dfb6 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -51,7 +51,6 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink;
 import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
@@ -66,7 +65,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
@@ -440,8 +438,7 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna
         if (valueEncodingFormat instanceof RawFormatSerializationSchema
                 && StringUtils.isNotBlank(sinkMultipleFormat)) {
             DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
-            List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map(
-                    AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList());
+            Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
             if (!supportFormats.contains(sinkMultipleFormat)) {
                 throw new ValidationException(String.format(
                         "Unsupported value '%s' for '%s'. "