You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/15 02:15:33 UTC
[incubator-seatunnel] branch api-draft updated: [api-draft][formats] json format (#2014)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 1673dcebb [api-draft][formats] json format (#2014)
1673dcebb is described below
commit 1673dcebb5d31a721cbe47fd1f4c8af7e9de6de2
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Wed Jun 15 10:15:28 2022 +0800
[api-draft][formats] json format (#2014)
---
pom.xml | 1 +
.../api/serialization/DeserializationSchema.java | 44 ++++
.../api/serialization/SerializationSchema.java | 30 +++
.../api/table/connector/DeserializationFormat.java | 39 +++
.../api/table/connector/SerializationFormat.java | 25 ++
.../factory/DeserializationFormatFactory.java | 24 ++
.../table/factory/SerializationFormatFactory.java | 24 ++
seatunnel-formats/pom.xml | 33 +++
seatunnel-formats/seatunnel-format-json/pom.xml | 39 +++
.../format/json/JsonDeserializationSchema.java | 122 +++++++++
.../seatunnel/format/json/JsonToRowConverters.java | 281 +++++++++++++++++++++
11 files changed, 662 insertions(+)
diff --git a/pom.xml b/pom.xml
index a858f5b91..0e479d784 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<module>seatunnel-api</module>
<module>seatunnel-translation</module>
<module>seatunnel-plugin-discovery</module>
+ <module>seatunnel-formats</module>
</modules>
<properties>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
new file mode 100644
index 000000000..0260d733a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.serialization;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+ /**
+ * Deserializes the byte message.
+ *
+ * @param message The message, as a byte array.
+ * @return The deserialized message as an SeaTunnel Row (null if the message cannot be deserialized).
+ */
+ T deserialize(byte[] message) throws IOException;
+
+ default void deserialize(byte[] message, Collector<T> out) throws IOException {
+ T deserialize = deserialize(message);
+ if (deserialize != null) {
+ out.collect(deserialize);
+ }
+ }
+
+ SeaTunnelDataType<T> getProducedType();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/SerializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/SerializationSchema.java
new file mode 100644
index 000000000..53f4c7293
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/SerializationSchema.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.serialization;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface SerializationSchema {
+ /**
+ * Serializes the incoming element to a specified type.
+ *
+ * @param element The incoming element to be serialized
+ * @return The serialized element.
+ */
+ byte[] serialize(SeaTunnelRow element);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/DeserializationFormat.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/DeserializationFormat.java
new file mode 100644
index 000000000..6a3cb0ad5
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/DeserializationFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface DeserializationFormat {
+
+ DeserializationSchema createDeserializationSchema();
+
+ default Map<String, SeaTunnelDataType<?>> listReadableMetadata() {
+ return Collections.emptyMap();
+ }
+
+ default void applyReadableMetadata(List<String> metadataKeys, SeaTunnelDataType<?> dataType) {
+ throw new UnsupportedOperationException(
+ "A decoding format must override this method to apply metadata keys.");
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SerializationFormat.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SerializationFormat.java
new file mode 100644
index 000000000..bc0c5c052
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SerializationFormat.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+
+public interface SerializationFormat {
+
+ SerializationSchema createSerializationSchema();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/DeserializationFormatFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/DeserializationFormatFactory.java
new file mode 100644
index 000000000..29285f641
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/DeserializationFormatFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+
+public interface DeserializationFormatFactory extends Factory {
+ DeserializationFormat createDeserializationFormat(TableFactoryContext context);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SerializationFormatFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SerializationFormatFactory.java
new file mode 100644
index 000000000..653bd03b3
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SerializationFormatFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+
+public interface SerializationFormatFactory extends Factory {
+ SerializationFormat createSerializationFormat(TableFactoryContext context);
+}
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
new file mode 100644
index 000000000..11c938b35
--- /dev/null
+++ b/seatunnel-formats/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-formats</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>seatunnel-format-json</module>
+ </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/pom.xml b/seatunnel-formats/seatunnel-format-json/pom.xml
new file mode 100644
index 000000000..7642c77e5
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-json/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-formats</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-format-json</artifactId>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
new file mode 100644
index 000000000..3510daae9
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.CompositeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+public class JsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Flag indicating whether to fail if a field is missing.
+ */
+ private final boolean failOnMissingField;
+
+ /**
+ * Flag indicating whether to ignore invalid fields/rows (default: throw an exception).
+ */
+ private final boolean ignoreParseErrors;
+
+ /**
+ * The row type of the produced {@link SeaTunnelRow}.
+ */
+ private final SeaTunnelRowType rowType;
+
+ /**
+ * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data
+ * structures.
+ */
+ private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
+
+ /**
+ * Object mapper for parsing the JSON.
+ */
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public JsonDeserializationSchema(boolean failOnMissingField,
+ boolean ignoreParseErrors,
+ SeaTunnelRowType rowType) {
+ if (ignoreParseErrors && failOnMissingField) {
+ throw new IllegalArgumentException(
+ "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
+ }
+ this.rowType = checkNotNull(rowType);
+ this.failOnMissingField = failOnMissingField;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.runtimeConverter =
+ new JsonToRowConverters(failOnMissingField, ignoreParseErrors)
+ .createConverter(checkNotNull(rowType));
+
+ if (hasDecimalType(rowType)) {
+ objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+ }
+ objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+ }
+
+ private static boolean hasDecimalType(SeaTunnelDataType<?> dataType) {
+ if (SqlType.of(dataType) == SqlType.DECIMAL) {
+ return true;
+ }
+ if (dataType instanceof CompositeType) {
+ CompositeType<?> compositeType = (CompositeType<?>) dataType;
+ for (SeaTunnelDataType<?> child : compositeType.getChildren()) {
+ if (hasDecimalType(child)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ if (message == null) {
+ return null;
+ }
+ try {
+ return (SeaTunnelRow) runtimeConverter.convert(objectMapper.readTree(message));
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ return null;
+ }
+ throw new IOException(
+ format("Failed to deserialize JSON '%s'.", new String(message)), t);
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getProducedType() {
+ return this.rowType;
+ }
+}
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
new file mode 100644
index 000000000..e6c28490f
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.Arrays;
+
+/** Tool class used to convert from {@link JsonNode} to {@link org.apache.seatunnel.api.table.type.SeaTunnelRow}. * */
+public class JsonToRowConverters implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("MagicNumber")
+ public static final DateTimeFormatter TIME_FORMAT =
+ new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+ .toFormatter();
+
+ /** Flag indicating whether to fail if a field is missing. */
+ private final boolean failOnMissingField;
+
+ /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ private final boolean ignoreParseErrors;
+
+ public JsonToRowConverters(
+ boolean failOnMissingField,
+ boolean ignoreParseErrors) {
+ this.failOnMissingField = failOnMissingField;
+ this.ignoreParseErrors = ignoreParseErrors;
+ }
+
+ /** Creates a runtime converter which is null safe. */
+ public JsonToRowConverter createConverter(SeaTunnelDataType<?> type) {
+ return wrapIntoNullableConverter(createNotNullConverter(type));
+ }
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ @SuppressWarnings("unchecked")
+ private JsonToRowConverter createNotNullConverter(SeaTunnelDataType<?> type) {
+ SqlType sqlType = SqlType.of(type);
+ switch (sqlType) {
+ case ROW:
+ return createRowConverter((SeaTunnelRowType) type);
+ case NULL:
+ return jsonNode -> null;
+ case BOOLEAN:
+ return this::convertToBoolean;
+ case TINYINT:
+ return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+ case SMALLINT:
+ return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+ case INT:
+ return this::convertToInt;
+ case BIGINT:
+ return this::convertToLong;
+ case DATE:
+ return this::convertToLocalDate;
+ case TIME:
+ return this::convertToLocalTime;
+ case TIMESTAMP:
+ return this::convertToLocalDateTime;
+ case FLOAT:
+ return this::convertToFloat;
+ case DOUBLE:
+ return this::convertToDouble;
+ case STRING:
+ return this::convertToString;
+ case BYTES:
+ return this::convertToBytes;
+ case DECIMAL:
+ return createDecimalConverter((BasicType<BigDecimal>) type);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private boolean convertToBoolean(JsonNode jsonNode) {
+ if (jsonNode.isBoolean()) {
+ // avoid redundant toString and parseBoolean, for better performance
+ return jsonNode.asBoolean();
+ } else {
+ return Boolean.parseBoolean(jsonNode.asText().trim());
+ }
+ }
+
+ private int convertToInt(JsonNode jsonNode) {
+ if (jsonNode.canConvertToInt()) {
+ // avoid redundant toString and parseInt, for better performance
+ return jsonNode.asInt();
+ } else {
+ return Integer.parseInt(jsonNode.asText().trim());
+ }
+ }
+
+ private long convertToLong(JsonNode jsonNode) {
+ if (jsonNode.canConvertToLong()) {
+ // avoid redundant toString and parseLong, for better performance
+ return jsonNode.asLong();
+ } else {
+ return Long.parseLong(jsonNode.asText().trim());
+ }
+ }
+
+ private double convertToDouble(JsonNode jsonNode) {
+ if (jsonNode.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return jsonNode.asDouble();
+ } else {
+ return Double.parseDouble(jsonNode.asText().trim());
+ }
+ }
+
+ private float convertToFloat(JsonNode jsonNode) {
+ if (jsonNode.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return (float) jsonNode.asDouble();
+ } else {
+ return Float.parseFloat(jsonNode.asText().trim());
+ }
+ }
+
+ private LocalDate convertToLocalDate(JsonNode jsonNode) {
+ return ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+ }
+
+ private LocalTime convertToLocalTime(JsonNode jsonNode) {
+ TemporalAccessor parsedTime = TIME_FORMAT.parse(jsonNode.asText());
+ return parsedTime.query(TemporalQueries.localTime());
+ }
+
+ private LocalDateTime convertToLocalDateTime(JsonNode jsonNode) {
+ TemporalAccessor parsedTimestamp = DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(jsonNode.asText());
+ LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+ LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+ return LocalDateTime.of(localDate, localTime);
+ }
+
+ private String convertToString(JsonNode jsonNode) {
+ if (jsonNode.isContainerNode()) {
+ return jsonNode.toString();
+ } else {
+ return jsonNode.asText();
+ }
+ }
+
+ private byte[] convertToBytes(JsonNode jsonNode) {
+ try {
+ return jsonNode.binaryValue();
+ } catch (IOException e) {
+ throw new JsonParseException("Unable to deserialize byte array.", e);
+ }
+ }
+
+ private JsonToRowConverter createDecimalConverter(BasicType<BigDecimal> decimalType) {
+ return jsonNode -> {
+ BigDecimal bigDecimal;
+ if (jsonNode.isBigDecimal()) {
+ bigDecimal = jsonNode.decimalValue();
+ } else {
+ bigDecimal = new BigDecimal(jsonNode.asText());
+ }
+
+ return bigDecimal;
+ };
+ }
+
+ public JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
+ final JsonToRowConverter[] fieldConverters =
+ Arrays.stream(rowType.getFieldTypes())
+ .map(this::createConverter)
+ .toArray(JsonToRowConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames();
+
+ return jsonNode -> {
+ ObjectNode node = (ObjectNode) jsonNode;
+ int arity = fieldNames.length;
+ SeaTunnelRow row = new SeaTunnelRow(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ JsonNode field = node.get(fieldName);
+ try {
+ Object convertedField = convertField(fieldConverters[i], fieldName, field);
+ row.setField(i, convertedField);
+ } catch (Throwable t) {
+ throw new JsonParseException(
+ String.format("Fail to deserialize at field: %s.", fieldName), t);
+ }
+ }
+ return row;
+ };
+ }
+
+ private Object convertField(
+ JsonToRowConverter fieldConverter, String fieldName, JsonNode field) {
+ if (field == null) {
+ if (failOnMissingField) {
+ throw new JsonParseException("Could not find field with name '" + fieldName + "'.");
+ } else {
+ return null;
+ }
+ } else {
+ return fieldConverter.convert(field);
+ }
+ }
+
+ private JsonToRowConverter wrapIntoNullableConverter(JsonToRowConverter converter) {
+ return jsonNode -> {
+ if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+ return null;
+ }
+ try {
+ return converter.convert(jsonNode);
+ } catch (Throwable t) {
+ if (!ignoreParseErrors) {
+ throw t;
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL internal
+ * data structures.
+ */
+ @FunctionalInterface
+ public interface JsonToRowConverter extends Serializable {
+ Object convert(JsonNode jsonNode);
+ }
+
+ /** Exception which refers to parse errors in converters. */
+ private static final class JsonParseException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public JsonParseException(String message) {
+ super(message);
+ }
+
+ public JsonParseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}