You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:44 UTC
[inlong] 02/07: [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 19ba51d4a9d609b520c4b18f7cab14024c74894f
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Nov 8 16:00:05 2022 +0800
[INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)
Co-authored-by: thesumery <15...@qq.com>
---
.../org/apache/inlong/sort/base/Constants.java | 7 +
.../base/format/AbstractDynamicSchemaFormat.java | 7 -
.../base/format/CanalJsonDynamicSchemaFormat.java | 25 +-
.../format/DebeziumJsonDynamicSchemaFormat.java | 24 +-
.../base/format/DynamicSchemaFormatFactory.java | 39 +-
.../sort/base/format/JsonDynamicSchemaFormat.java | 50 ++-
.../sort/base/format/JsonToRowDataConverters.java | 414 +++++++++++++++++++++
.../inlong/sort/base/sink/MultipleSinkOption.java | 24 +-
.../format/CanalJsonDynamicSchemaFormatTest.java | 12 +-
.../DebeziumJsonDynamicSchemaFormatTest.java | 3 +-
...eziumJsonDynamicSchemaFormatWithSchemaTest.java | 3 +-
.../sort/doris/table/DorisDynamicTableFactory.java | 6 +-
.../sort/iceberg/FlinkDynamicTableFactory.java | 2 +
.../inlong/sort/iceberg/IcebergTableSink.java | 2 +
.../sink/multiple/DynamicSchemaHandleOperator.java | 4 +-
.../sort/kafka/table/KafkaDynamicTableFactory.java | 5 +-
16 files changed, 539 insertions(+), 88 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19c58e9c1..9cc3c979d 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -163,4 +163,11 @@ public final class Constants {
.booleanType()
.defaultValue(true)
.withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
+
+ public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
+ ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+ + "`time`, so type conversions must be mapped to types supported by spark.");
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
index 2def7bb4b..f32b0086e 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
@@ -187,11 +187,4 @@ public abstract class AbstractDynamicSchemaFormat<T> {
* @throws IOException The exception will throws
*/
public abstract String parse(T data, String pattern) throws IOException;
-
- /**
- * Get the identifier of this dynamic schema format
- *
- * @return The identifier of this dynamic schema format
- */
- public abstract String identifier();
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index 88e51df7b..81256eda1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -17,22 +17,22 @@
package org.apache.inlong.sort.base.format;
-import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* Canal json dynamic format
*/
public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
- private static final String IDENTIFIER = "canal-json";
private static final String DDL_FLAG = "ddl";
private static final String DATA = "data";
private static final String OLD = "old";
@@ -43,15 +43,8 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
private static final String OP_UPDATE = "UPDATE";
private static final String OP_DELETE = "DELETE";
- private static final CanalJsonDynamicSchemaFormat FORMAT = new CanalJsonDynamicSchemaFormat();
-
- private CanalJsonDynamicSchemaFormat() {
-
- }
-
- @SuppressWarnings("rawtypes")
- public static AbstractDynamicSchemaFormat getInstance() {
- return FORMAT;
+ protected CanalJsonDynamicSchemaFormat(Map<String, String> props) {
+ super(props);
}
@Override
@@ -170,14 +163,4 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
return rowDataList;
}
-
- /**
- * Get the identifier of this dynamic schema format
- *
- * @return The identifier of this dynamic schema format
- */
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index 9841d01f6..d535b87d1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.base.format;
-import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.RowData;
@@ -33,6 +32,7 @@ import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter;
import java.io.IOException;
import java.util.ArrayList;
@@ -44,7 +44,6 @@ import java.util.Map;
*/
public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
- private static final String IDENTIFIER = "debezium-json";
private static final String DDL_FLAG = "ddl";
private static final String SCHEMA = "schema";
private static final String SQL_TYPE = "sqlType";
@@ -87,15 +86,8 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
.put("BYTES", new VarBinaryType())
.build();
- private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
-
- private DebeziumJsonDynamicSchemaFormat() {
-
- }
-
- @SuppressWarnings("rawtypes")
- public static AbstractDynamicSchemaFormat getInstance() {
- return FORMAT;
+ protected DebeziumJsonDynamicSchemaFormat(Map<String, String> props) {
+ super(props);
}
@Override
@@ -289,16 +281,6 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
return extractRowData(payload, rowType);
}
- /**
- * Get the identifier of this dynamic schema format
- *
- * @return The identifier of this dynamic schema format
- */
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
private LogicalType debeziumType2FlinkType(String debeziumType) {
if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) {
return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase());
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
index 65a5b5e78..c260bdeaa 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
@@ -17,26 +17,36 @@
package org.apache.inlong.sort.base.format;
+import com.google.common.collect.ImmutableMap;
import org.apache.flink.util.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
/**
* Dynamic schema format factory
*/
public class DynamicSchemaFormatFactory {
- public static final List<AbstractDynamicSchemaFormat<?>> SUPPORT_FORMATS =
- new ArrayList<AbstractDynamicSchemaFormat<?>>() {
+ public static Map<String, Function<Map<String, String>, AbstractDynamicSchemaFormat>> SUPPORT_FORMATS =
+ ImmutableMap.of(
+ "canal-json", props -> new CanalJsonDynamicSchemaFormat(props),
+ "debezium-json", props -> new DebeziumJsonDynamicSchemaFormat(props)
+ );
- private static final long serialVersionUID = 1L;
+ /**
+ * Get format from the format name, it only supports [canal-json|debezium-json] for now
+ *
+ * @param identifier The identifier of this format
+ * @return The dynamic format
+ */
+ @SuppressWarnings("rawtypes")
+ public static AbstractDynamicSchemaFormat getFormat(String identifier) {
+ return getFormat(identifier, new HashMap<>());
+ }
- {
- add(CanalJsonDynamicSchemaFormat.getInstance());
- add(DebeziumJsonDynamicSchemaFormat.getInstance());
- }
- };
/**
* Get format from the format name, it only supports [canal-json|debezium-json] for now
@@ -45,10 +55,11 @@ public class DynamicSchemaFormatFactory {
* @return The dynamic format
*/
@SuppressWarnings("rawtypes")
- public static AbstractDynamicSchemaFormat getFormat(String identifier) {
+ public static AbstractDynamicSchemaFormat getFormat(String identifier, Map<String, String> properties) {
Preconditions.checkNotNull(identifier, "The identifier is null");
- return Preconditions.checkNotNull(SUPPORT_FORMATS.stream().filter(s -> s.identifier().equals(identifier))
- .findFirst().orElse(null), "Unsupport dynamic schema format for:" + identifier);
+ return Optional.ofNullable(SUPPORT_FORMATS.get(identifier))
+ .orElseThrow(() ->
+ new UnsupportedOperationException("Unsupport dynamic schema format for:" + identifier))
+ .apply(properties);
}
-
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index a0d564d0c..e6fc75528 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,8 +17,9 @@
package org.apache.inlong.sort.base.format;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonToRowDataConverters;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -51,6 +52,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
+
/**
* Json dynamic format class
* This class main handle:
@@ -64,6 +67,7 @@ import java.util.regex.Matcher;
* 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
* the result of pared will be 'prefix_1_2_3_suffix'
*/
+@SuppressWarnings("LanguageDetectionInspection")
public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
/**
@@ -71,7 +75,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
*/
private static final Integer FIRST = 0;
- private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING =
+ private static final Map<Integer, LogicalType> SQL_TYPE_2_FLINK_TYPE_MAPPING =
ImmutableMap.<Integer, LogicalType>builder()
.put(java.sql.Types.CHAR, new CharType())
.put(java.sql.Types.VARCHAR, new VarCharType())
@@ -94,12 +98,44 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
.put(java.sql.Types.BOOLEAN, new BooleanType())
.put(java.sql.Types.OTHER, new VarCharType())
.build();
+
+ private static final Map<Integer, LogicalType> SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
+ ImmutableMap.<Integer, LogicalType>builder()
+ .put(java.sql.Types.CHAR, new CharType())
+ .put(java.sql.Types.VARCHAR, new VarCharType())
+ .put(java.sql.Types.SMALLINT, new SmallIntType())
+ .put(java.sql.Types.INTEGER, new IntType())
+ .put(java.sql.Types.BIGINT, new BigIntType())
+ .put(java.sql.Types.REAL, new FloatType())
+ .put(java.sql.Types.DOUBLE, new DoubleType())
+ .put(java.sql.Types.FLOAT, new FloatType())
+ .put(java.sql.Types.DECIMAL, new DecimalType())
+ .put(java.sql.Types.NUMERIC, new DecimalType())
+ .put(java.sql.Types.BIT, new BooleanType())
+ .put(java.sql.Types.TIME, new VarCharType())
+ .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new LocalZonedTimestampType())
+ .put(java.sql.Types.TIMESTAMP, new LocalZonedTimestampType())
+ .put(java.sql.Types.BINARY, new BinaryType())
+ .put(java.sql.Types.VARBINARY, new VarBinaryType())
+ .put(java.sql.Types.BLOB, new VarBinaryType())
+ .put(java.sql.Types.DATE, new DateType())
+ .put(java.sql.Types.BOOLEAN, new BooleanType())
+ .put(java.sql.Types.OTHER, new VarCharType())
+ .build();
+
public final ObjectMapper objectMapper = new ObjectMapper();
protected final JsonToRowDataConverters rowDataConverters;
+ protected final boolean adaptSparkEngine;
- protected JsonDynamicSchemaFormat() {
+ public JsonDynamicSchemaFormat(Map<String, String> properties) {
+ ReadableConfig config = Configuration.fromMap(properties);
+ this.adaptSparkEngine = config.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
this.rowDataConverters =
- new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601);
+ new JsonToRowDataConverters(
+ false,
+ false,
+ TimestampFormat.ISO_8601,
+ adaptSparkEngine);
}
/**
@@ -286,8 +322,10 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
}
private LogicalType sqlType2FlinkType(int jdbcType) {
- if (SQL_TYPE_2_ICEBERG_TYPE_MAPPING.containsKey(jdbcType)) {
- return SQL_TYPE_2_ICEBERG_TYPE_MAPPING.get(jdbcType);
+ Map<Integer, LogicalType> typeMap = adaptSparkEngine
+ ? SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING : SQL_TYPE_2_FLINK_TYPE_MAPPING;
+ if (typeMap.containsKey(jdbcType)) {
+ return typeMap.get(jdbcType);
} else {
throw new IllegalArgumentException("Unsupported jdbcType: " + jdbcType);
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
new file mode 100644
index 000000000..da6128942
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
+
+/** Tool class used to convert from {@link JsonNode} to {@link RowData}. * */
+@Internal
+public class JsonToRowDataConverters implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Flag indicating whether to fail if a field is missing. */
+ private final boolean failOnMissingField;
+
+ /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ private final boolean ignoreParseErrors;
+
+ /** Timestamp format specification which is used to parse timestamp. */
+ private final TimestampFormat timestampFormat;
+
+ /** Wherther adapt spark sql program. */
+ private final boolean adaptSpark;
+
+ public JsonToRowDataConverters(
+ boolean failOnMissingField,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat,
+ boolean adaptSpark) {
+ this.failOnMissingField = failOnMissingField;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormat = timestampFormat;
+ this.adaptSpark = adaptSpark;
+ }
+
+ /**
+ * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL internal
+ * data structures.
+ */
+ @FunctionalInterface
+ public interface JsonToRowDataConverter extends Serializable {
+ Object convert(JsonNode jsonNode);
+ }
+
+ /** Creates a runtime converter which is null safe. */
+ public JsonToRowDataConverter createConverter(LogicalType type) {
+ return wrapIntoNullableConverter(createNotNullConverter(type));
+ }
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ private JsonToRowDataConverter createNotNullConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return jsonNode -> null;
+ case BOOLEAN:
+ return this::convertToBoolean;
+ case TINYINT:
+ return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+ case SMALLINT:
+ return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return this::convertToInt;
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return this::convertToLong;
+ case DATE:
+ return this::convertToDate;
+ case TIME_WITHOUT_TIME_ZONE:
+ return this::convertToTime;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return this::convertToTimestamp;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (adaptSpark) {
+ return jsonNode -> {
+ try {
+ return convertToTimestampWithLocalZone(jsonNode);
+ } catch (DateTimeParseException e) {
+ return convertToTimestamp(jsonNode);
+ }
+ };
+ }
+ return this::convertToTimestampWithLocalZone;
+ case FLOAT:
+ return this::convertToFloat;
+ case DOUBLE:
+ return this::convertToDouble;
+ case CHAR:
+ case VARCHAR:
+ return this::convertToString;
+ case BINARY:
+ case VARBINARY:
+ return this::convertToBytes;
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case MAP:
+ MapType mapType = (MapType) type;
+ return createMapConverter(
+ mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
+ case MULTISET:
+ MultisetType multisetType = (MultisetType) type;
+ return createMapConverter(
+ multisetType.asSummaryString(),
+ multisetType.getElementType(),
+ new IntType());
+ case ROW:
+ return createRowConverter((RowType) type);
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private boolean convertToBoolean(JsonNode jsonNode) {
+ if (jsonNode.isBoolean()) {
+ // avoid redundant toString and parseBoolean, for better performance
+ return jsonNode.asBoolean();
+ } else {
+ return Boolean.parseBoolean(jsonNode.asText().trim());
+ }
+ }
+
+ private int convertToInt(JsonNode jsonNode) {
+ if (jsonNode.canConvertToInt()) {
+ // avoid redundant toString and parseInt, for better performance
+ return jsonNode.asInt();
+ } else {
+ return Integer.parseInt(jsonNode.asText().trim());
+ }
+ }
+
+ private long convertToLong(JsonNode jsonNode) {
+ if (jsonNode.canConvertToLong()) {
+ // avoid redundant toString and parseLong, for better performance
+ return jsonNode.asLong();
+ } else {
+ return Long.parseLong(jsonNode.asText().trim());
+ }
+ }
+
+ private double convertToDouble(JsonNode jsonNode) {
+ if (jsonNode.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return jsonNode.asDouble();
+ } else {
+ return Double.parseDouble(jsonNode.asText().trim());
+ }
+ }
+
+ private float convertToFloat(JsonNode jsonNode) {
+ if (jsonNode.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return (float) jsonNode.asDouble();
+ } else {
+ return Float.parseFloat(jsonNode.asText().trim());
+ }
+ }
+
+ private int convertToDate(JsonNode jsonNode) {
+ LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+ return (int) date.toEpochDay();
+ }
+
+ private int convertToTime(JsonNode jsonNode) {
+ TemporalAccessor parsedTime = SQL_TIME_FORMAT.parse(jsonNode.asText());
+ LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+ // get number of milliseconds of the day
+ return localTime.toSecondOfDay() * 1000;
+ }
+
+ private TimestampData convertToTimestamp(JsonNode jsonNode) {
+ TemporalAccessor parsedTimestamp;
+ switch (timestampFormat) {
+ case SQL:
+ parsedTimestamp = SQL_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+ break;
+ case ISO_8601:
+ parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+ break;
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported timestamp format '%s'. Validator should have checked that.",
+ timestampFormat));
+ }
+ LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+ LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+
+ return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
+ }
+
+ private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode) {
+ TemporalAccessor parsedTimestampWithLocalZone;
+ switch (timestampFormat) {
+ case SQL:
+ parsedTimestampWithLocalZone =
+ SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+ break;
+ case ISO_8601:
+ parsedTimestampWithLocalZone =
+ ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+ break;
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported timestamp format '%s'. Validator should have checked that.",
+ timestampFormat));
+ }
+ LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+ LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+
+ return TimestampData.fromInstant(
+ LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC));
+ }
+
+ private StringData convertToString(JsonNode jsonNode) {
+ if (jsonNode.isContainerNode()) {
+ return StringData.fromString(jsonNode.toString());
+ } else {
+ return StringData.fromString(jsonNode.asText());
+ }
+ }
+
+ private byte[] convertToBytes(JsonNode jsonNode) {
+ try {
+ return jsonNode.binaryValue();
+ } catch (IOException e) {
+ throw new JsonParseException("Unable to deserialize byte array.", e);
+ }
+ }
+
+ private JsonToRowDataConverter createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return jsonNode -> {
+ BigDecimal bigDecimal;
+ if (jsonNode.isBigDecimal()) {
+ bigDecimal = jsonNode.decimalValue();
+ } else {
+ bigDecimal = new BigDecimal(jsonNode.asText());
+ }
+ return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+ };
+ }
+
+ private JsonToRowDataConverter createArrayConverter(ArrayType arrayType) {
+ JsonToRowDataConverter elementConverter = createConverter(arrayType.getElementType());
+ final Class<?> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+ return jsonNode -> {
+ final ArrayNode node = (ArrayNode) jsonNode;
+ final Object[] array = (Object[]) Array.newInstance(elementClass, node.size());
+ for (int i = 0; i < node.size(); i++) {
+ final JsonNode innerNode = node.get(i);
+ array[i] = elementConverter.convert(innerNode);
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ private JsonToRowDataConverter createMapConverter(
+ String typeSummary, LogicalType keyType, LogicalType valueType) {
+ if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "JSON format doesn't support non-string as key type of map. "
+ + "The type is: "
+ + typeSummary);
+ }
+ final JsonToRowDataConverter keyConverter = createConverter(keyType);
+ final JsonToRowDataConverter valueConverter = createConverter(valueType);
+
+ return jsonNode -> {
+ Iterator<Entry<String, JsonNode>> fields = jsonNode.fields();
+ Map<Object, Object> result = new HashMap<>();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = fields.next();
+ Object key = keyConverter.convert(TextNode.valueOf(entry.getKey()));
+ Object value = valueConverter.convert(entry.getValue());
+ result.put(key, value);
+ }
+ return new GenericMapData(result);
+ };
+ }
+
+ public JsonToRowDataConverter createRowConverter(RowType rowType) {
+ final JsonToRowDataConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(this::createConverter)
+ .toArray(JsonToRowDataConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+ return jsonNode -> {
+ ObjectNode node = (ObjectNode) jsonNode;
+ int arity = fieldNames.length;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ JsonNode field = node.get(fieldName);
+ Object convertedField = convertField(fieldConverters[i], fieldName, field);
+ row.setField(i, convertedField);
+ }
+ return row;
+ };
+ }
+
+ private Object convertField(
+ JsonToRowDataConverter fieldConverter, String fieldName, JsonNode field) {
+ if (field == null) {
+ if (failOnMissingField) {
+ throw new JsonParseException("Could not find field with name '" + fieldName + "'.");
+ } else {
+ return null;
+ }
+ } else {
+ return fieldConverter.convert(field);
+ }
+ }
+
+ private JsonToRowDataConverter wrapIntoNullableConverter(
+ JsonToRowDataConverter converter) {
+ return jsonNode -> {
+ if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+ return null;
+ }
+ try {
+ return converter.convert(jsonNode);
+ } catch (Throwable t) {
+ if (!ignoreParseErrors) {
+ throw t;
+ }
+ return null;
+ }
+ };
+ }
+
+ /** Exception which refers to parse errors in converters. */
+ private static final class JsonParseException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public JsonParseException(String message) {
+ super(message);
+ }
+
+ public JsonParseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 77c924b95..10bc3f3d1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -18,11 +18,14 @@
package org.apache.inlong.sort.base.sink;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.Map;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE;
import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.TRY_IT_BEST;
@@ -37,6 +40,8 @@ public class MultipleSinkOption implements Serializable {
private String format;
+ private boolean sparkEngineEnable;
+
private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private String databasePattern;
@@ -44,10 +49,12 @@ public class MultipleSinkOption implements Serializable {
private String tablePattern;
public MultipleSinkOption(String format,
+ boolean sparkEngineEnable,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
String databasePattern,
String tablePattern) {
this.format = format;
+ this.sparkEngineEnable = sparkEngineEnable;
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
@@ -57,6 +64,15 @@ public class MultipleSinkOption implements Serializable {
return format;
}
+ public boolean isSparkEngineEnable() {
+ return sparkEngineEnable;
+ }
+
+ public Map<String, String> getFormatOption() {
+ return ImmutableMap.of(
+ SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK.key(), String.valueOf(isSparkEngineEnable()));
+ }
+
public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() {
return schemaUpdatePolicy;
}
@@ -75,6 +91,7 @@ public class MultipleSinkOption implements Serializable {
public static class Builder {
private String format;
+ private boolean sparkEngineEnable;
private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private String databasePattern;
private String tablePattern;
@@ -84,6 +101,11 @@ public class MultipleSinkOption implements Serializable {
return this;
}
+ public MultipleSinkOption.Builder withSparkEngineEnable(boolean sparkEngineEnable) {
+ this.sparkEngineEnable = sparkEngineEnable;
+ return this;
+ }
+
public MultipleSinkOption.Builder withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
this.schemaUpdatePolicy = schemaUpdatePolicy;
return this;
@@ -100,7 +122,7 @@ public class MultipleSinkOption implements Serializable {
}
public MultipleSinkOption build() {
- return new MultipleSinkOption(format, schemaUpdatePolicy, databasePattern, tablePattern);
+ return new MultipleSinkOption(format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern);
}
}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index dc7ce5ec8..29dbcb6a1 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
* Test for {@link CanalJsonDynamicSchemaFormat}
*/
public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+ private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("canal-json");
@Override
protected String getSource() {
@@ -91,6 +92,11 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
return expectedValues;
}
+ @Override
+ protected AbstractDynamicSchemaFormat<JsonNode> getDynamicSchemaFormat() {
+ return schemaFormat;
+ }
+
@Test
@SuppressWarnings({"unchecked"})
public void testExtractPrimaryKey() throws IOException {
@@ -119,10 +125,4 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
5.18f));
Assert.assertEquals(values, rowDataList);
}
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
- return CanalJsonDynamicSchemaFormat.getInstance();
- }
}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index 5e555a657..c0eaface9 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
* Test for {@link DebeziumJsonDynamicSchemaFormat}
*/
public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+ private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json");
@Override
protected String getSource() {
@@ -98,6 +99,6 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
- return DebeziumJsonDynamicSchemaFormat.getInstance();
+ return schemaFormat;
}
}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
index fb268b9a7..d4a3dd188 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
* Test for {@link DebeziumJsonDynamicSchemaFormat}
*/
public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+ private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json");
@Override
protected String getSource() {
@@ -270,6 +271,6 @@ public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchema
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
- return DebeziumJsonDynamicSchemaFormat.getInstance();
+ return schemaFormat;
}
}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index f3c263f93..30ef92b69 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -35,16 +35,13 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.utils.TableSchemaUtils;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import java.time.Duration;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.stream.Collectors;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -332,8 +329,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
+ "is not allowed blank when the option 'sink.multiple.enable' is 'true'");
}
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
- List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map(
- AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList());
+ Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
if (!supportFormats.contains(sinkMultipleFormat)) {
throw new ValidationException(String.format(
"Unsupported value '%s' for '%s'. "
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 4852bca33..040e73df6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -55,6 +55,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
/**
@@ -241,6 +242,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
return options;
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ee7cf2c89..67ca5fdf2 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -49,6 +49,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
/**
@@ -104,6 +105,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
.multipleSink(tableOptions.get(SINK_MULTIPLE_ENABLE))
.multipleSinkOption(MultipleSinkOption.builder()
.withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
+ .withSparkEngineEnable(tableOptions.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK))
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index a0a9092c6..378f040d9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -94,7 +94,9 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
this.catalog = catalogLoader.loadCatalog();
this.asNamespaceCatalog =
catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
- this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+ this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
+ multipleSinkOption.getFormat(), multipleSinkOption.getFormatOption());
+
this.processingTimeService = getRuntimeContext().getProcessingTimeService();
processingTimeService.registerTimer(
processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 480d55948..69716dfb6 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -51,7 +51,6 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.kafka.KafkaDynamicSink;
import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
@@ -66,7 +65,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
@@ -440,8 +438,7 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna
if (valueEncodingFormat instanceof RawFormatSerializationSchema
&& StringUtils.isNotBlank(sinkMultipleFormat)) {
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
- List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map(
- AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList());
+ Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
if (!supportFormats.contains(sinkMultipleFormat)) {
throw new ValidationException(String.format(
"Unsupported value '%s' for '%s'. "