You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/18 09:20:05 UTC
[incubator-inlong] branch master updated: [INLONG-2524][Feature][Sort] Support deserialization of json, canal and avro formatted data (#2525)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7f8d471 [INLONG-2524][Feature][Sort] Support deserialization of json, canal and avro formatted data (#2525)
7f8d471 is described below
commit 7f8d4711e5ff1b1e8b0bc5931fb2e0c082f1facd
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Fri Feb 18 17:19:58 2022 +0800
[INLONG-2524][Feature][Sort] Support deserialization of json, canal and avro formatted data (#2525)
---
inlong-sort/pom.xml | 2 +-
.../inlong/sort/configuration/Constants.java | 2 -
.../inlong/sort/protocol/DataFlowStorageInfo.java | 6 +-
.../org/apache/inlong/sort/protocol/FieldInfo.java | 12 +-
.../AvroDeserializationInfo.java} | 27 +--
.../deserialization/CanalDeserializationInfo.java | 108 +++++++++++
.../deserialization/CsvDeserializationInfo.java | 2 +
.../deserialization/DeserializationInfo.java | 3 +
.../JsonDeserializationInfo.java} | 27 +--
.../deserialization/KvDeserializationInfo.java | 2 +
.../inlong/sort/protocol/sink/DorisSinkInfo.java | 2 -
.../inlong/sort/protocol/sink/HiveSinkInfo.java | 28 ++-
.../inlong/sort/protocol/sink/IcebergSinkInfo.java | 2 +
.../AvroDeserializationInfoTest.java} | 29 +--
.../CanalDeserializationInfoTest.java} | 34 ++--
.../JsonDeserializationInfoTest.java} | 29 +--
.../inlong/sort/formats/base/TableFormatUtils.java | 9 +
.../inlong/sort/formats/common/Constants.java} | 24 +--
.../inlong/sort/formats/common/DateFormatInfo.java | 9 +-
.../inlong/sort/formats/common/FormatInfo.java | 3 +-
.../common/LocalZonedTimestampFormatInfo.java} | 51 ++---
.../common/LocalZonedTimestampTypeInfo.java} | 35 ++--
.../inlong/sort/formats/common/TimeFormatInfo.java | 11 +-
.../sort/formats/common/TimestampFormatInfo.java | 9 +-
.../inlong/sort/formats/common/TypeInfo.java | 3 +-
.../common/LocalZonedTimestampFormatInfoTest.java} | 26 +--
inlong-sort/sort-single-tenant/pom.xml | 10 +
.../inlong/sort/singletenant/flink/Entrance.java | 68 +++++--
.../sort/singletenant/flink/SerializedRecord.java | 58 ++++++
.../clickhouse/ClickhouseRowSinkFunction.java | 2 +
.../CanalDeserializationSchemaBuilder.java | 137 ++++++++++++++
...stomDateFormatDeserializationSchemaWrapper.java | 125 ++++++++++++
.../deserialization/DeserializationFunction.java | 66 +++++++
.../DeserializationSchemaFactory.java | 96 ++++++++++
.../flink/deserialization/ListCollector.java} | 33 ++--
.../RowDataToRowDeserializationSchemaWrapper.java | 79 ++++++++
.../singletenant/flink/kafka/KafkaSinkBuilder.java | 49 +----
.../flink/pulsar/PulsarSourceBuilder.java | 78 ++++++++
...y.java => CanalSerializationSchemaBuilder.java} | 43 ++---
...CustomDateFormatSerializationSchemaWrapper.java | 90 +++++++++
.../RowToRowDataSerializationSchemaWrapper.java | 52 +++++
...actory.java => SerializationSchemaFactory.java} | 39 ++--
.../sort/singletenant/flink/utils/CommonUtils.java | 103 +++++++++-
.../deserialization/AvroDeserializationTest.java | 102 ++++++++++
.../deserialization/CanalDeserializationTest.java | 209 +++++++++++++++++++++
...DateFormatDeserializationSchemaWrapperTest.java | 65 +++++++
.../DeserializationFunctionTest.java | 53 ++++++
.../deserialization/JsonDeserializationTest.java | 112 +++++++++++
.../flink/kafka/KafkaSinkTestBase.java | 23 ++-
.../flink/kafka/KafkaSinkTestBaseForRow.java | 45 -----
.../flink/kafka/KafkaSinkTestBaseForRowData.java | 43 -----
.../flink/kafka/RowToAvroKafkaSinkTest.java | 8 +-
.../flink/kafka/RowToCanalKafkaSinkTest.java | 9 +-
.../flink/kafka/RowToJsonKafkaSinkTest.java | 9 +-
.../flink/kafka/RowToStringKafkaSinkTest.java | 9 +-
...omDateFormatSerializationSchemaWrapperTest.java | 66 +++++++
.../singletenant/flink/utils/CommonUtilsTest.java | 17 ++
57 files changed, 1963 insertions(+), 430 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index d242dd2..baf2db4 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -55,7 +55,7 @@
<tubemq.version>${project.version}</tubemq.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
<clickhouse-jdbc.version>0.3.0</clickhouse-jdbc.version>
- <flink.jackson.version>2.9.8-7.0</flink.jackson.version>
+ <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
<jsr.version>3.0.2</jsr.version>
<snappy.version>1.1.4</snappy.version>
<hadoop.version>2.8.5</hadoop.version>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index a0ad42b..c6e8067 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -57,8 +57,6 @@ public class Constants {
public static final String DESERIALIZATION_SCHEMA_UID = "deserialization_schema_uid";
- public static final String CONVERTER_UID = "converter_uid";
-
public static final String SINK_UID = "sink_uid";
/**
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java
index 95d99d1..4ef0d63 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java
@@ -18,11 +18,15 @@
package org.apache.inlong.sort.protocol;
import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
import java.util.Objects;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-public class DataFlowStorageInfo {
+public class DataFlowStorageInfo implements Serializable {
+
+ private static final long serialVersionUID = -2785142086976967367L;
public enum StorageType {
ZK,
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 42abe16..f8b1cb9 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -22,14 +22,18 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
import java.util.Objects;
-public class FieldInfo {
+public class FieldInfo implements Serializable {
+
+ private static final long serialVersionUID = 5871970550803344673L;
+
@JsonProperty("name")
private final String name;
@JsonProperty("format_info")
- private final FormatInfo formatInfo;
+ private FormatInfo formatInfo;
@JsonCreator
public FieldInfo(
@@ -47,6 +51,10 @@ public class FieldInfo {
return formatInfo;
}
+ public void setFormatInfo(FormatInfo formatInfo) {
+ this.formatInfo = formatInfo;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfo.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfo.java
index 5ab6c71..2c5f011 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfo.java
@@ -16,28 +16,19 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+public class AvroDeserializationInfo implements DeserializationInfo {
-public class IcebergSinkInfo extends SinkInfo {
+ private static final long serialVersionUID = -5344203248610337314L;
- @JsonProperty("table_location")
- private final String tableLocation;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
- @JsonCreator
- public IcebergSinkInfo(
- @JsonProperty("fields") FieldInfo[] fields,
- @JsonProperty("table_location") String tableLocation) {
- super(fields);
- this.tableLocation = Preconditions.checkNotNull(tableLocation);
+ return o != null && getClass() == o.getClass();
}
- @JsonProperty("table_location")
- public String getTableLocation() {
- return tableLocation;
- }
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
new file mode 100644
index 0000000..a6ac92c
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.deserialization;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class CanalDeserializationInfo implements DeserializationInfo {
+
+ private static final long serialVersionUID = -5344203248610337314L;
+
+ @JsonProperty("database")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final String database;
+
+ @JsonProperty("table")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final String table;
+
+ @JsonProperty("ignore_parse_errors")
+ private final boolean ignoreParseErrors;
+
+ @JsonProperty("timestamp_format_standard")
+ private final String timestampFormatStandard;
+
+ @JsonProperty("include_metadata")
+ private final boolean includeMetadata;
+
+ @JsonCreator
+ public CanalDeserializationInfo(
+ @JsonProperty("database") String database,
+ @JsonProperty("table") String table,
+ @JsonProperty("ignore_parse_errors") boolean ignoreParseErrors,
+ @JsonProperty("timestamp_format_standard") String timestampFormatStandard,
+ @JsonProperty("include_metadata") boolean includeMetadata) {
+ this.database = database;
+ this.table = table;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormatStandard = timestampFormatStandard;
+ this.includeMetadata = includeMetadata;
+ }
+
+ @JsonProperty("database")
+ public String getDatabase() {
+ return database;
+ }
+
+ @JsonProperty("table")
+ public String getTable() {
+ return table;
+ }
+
+ @JsonProperty("ignore_parse_errors")
+ public boolean isIgnoreParseErrors() {
+ return ignoreParseErrors;
+ }
+
+ @JsonProperty("timestamp_format_standard")
+ public String getTimestampFormatStandard() {
+ return timestampFormatStandard;
+ }
+
+ @JsonProperty("include_metadata")
+ public boolean isIncludeMetadata() {
+ return includeMetadata;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CanalDeserializationInfo that = (CanalDeserializationInfo) o;
+ return ignoreParseErrors == that.ignoreParseErrors
+ && includeMetadata == that.includeMetadata
+ && Objects.equals(database, that.database)
+ && Objects.equals(table, that.table)
+ && Objects.equals(timestampFormatStandard, that.timestampFormatStandard);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(database, table, ignoreParseErrors, timestampFormatStandard, includeMetadata);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index fafae28..2c3fc0b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -25,6 +25,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
*/
public class CsvDeserializationInfo implements DeserializationInfo {
+ private static final long serialVersionUID = -5035426390567887081L;
+
private final char splitter;
// TODO: support mapping index to field
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index 844f506..e1599ff 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -32,6 +32,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
property = "type")
@JsonSubTypes({
@Type(value = CsvDeserializationInfo.class, name = "csv"),
+ @Type(value = AvroDeserializationInfo.class, name = "avro"),
+ @Type(value = JsonDeserializationInfo.class, name = "json"),
+ @Type(value = CanalDeserializationInfo.class, name = "canal"),
@Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlongmsg_csv"),
@Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlongmsg_csv2"),
@Type(value = InLongMsgKvDeserializationInfo.class, name = "inlongmsg_kv"),
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
index 5ab6c71..6c73d9a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
@@ -16,28 +16,19 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+public class JsonDeserializationInfo implements DeserializationInfo {
-public class IcebergSinkInfo extends SinkInfo {
+ private static final long serialVersionUID = -5344203248610337314L;
- @JsonProperty("table_location")
- private final String tableLocation;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
- @JsonCreator
- public IcebergSinkInfo(
- @JsonProperty("fields") FieldInfo[] fields,
- @JsonProperty("table_location") String tableLocation) {
- super(fields);
- this.tableLocation = Preconditions.checkNotNull(tableLocation);
+ return o != null && getClass() == o.getClass();
}
- @JsonProperty("table_location")
- public String getTableLocation() {
- return tableLocation;
- }
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index aadde75..bed2859 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -27,6 +27,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
*/
public class KvDeserializationInfo implements DeserializationInfo {
+ private static final long serialVersionUID = 1976031542480774581L;
+
private final char entrySplitter;
private final char kvSplitter;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java
index 0b0d892..d6f8b05 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java
@@ -24,8 +24,6 @@ import org.apache.inlong.sort.protocol.FieldInfo;
public class DorisSinkInfo extends SinkInfo {
-
-
private static final long serialVersionUID = 1L;
@JsonProperty("table_identifier")
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
index 89ee544..381a1f1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
@@ -134,7 +134,10 @@ public class HiveSinkInfo extends SinkInfo {
@JsonSubTypes({
@Type(value = HiveTimePartitionInfo.class, name = "time"),
@Type(value = HiveFieldPartitionInfo.class, name = "field")})
- public abstract static class HivePartitionInfo {
+ public abstract static class HivePartitionInfo implements Serializable {
+
+ private static final long serialVersionUID = -4276796328049383208L;
+
@JsonProperty("field_name")
private final String fieldName;
@@ -151,6 +154,8 @@ public class HiveSinkInfo extends SinkInfo {
public static class HiveTimePartitionInfo extends HivePartitionInfo {
+ private static final long serialVersionUID = -2475470848828020684L;
+
@JsonProperty("date_format")
private final String format;
@@ -169,6 +174,8 @@ public class HiveSinkInfo extends SinkInfo {
public static class HiveFieldPartitionInfo extends HivePartitionInfo {
+ private static final long serialVersionUID = 9208133177416395986L;
+
public HiveFieldPartitionInfo(
@JsonProperty("field_name") String fieldName) {
super(fieldName);
@@ -186,10 +193,12 @@ public class HiveSinkInfo extends SinkInfo {
@Type(value = OrcFileFormat.class, name = "orc"),
@Type(value = SequenceFileFormat.class, name = "sequence"),
@Type(value = ParquetFileFormat.class, name = "parquet"),})
- public interface HiveFileFormat {
+ public interface HiveFileFormat extends Serializable {
}
- public static class TextFileFormat implements HiveFileFormat, Serializable {
+ public static class TextFileFormat implements HiveFileFormat {
+
+ private static final long serialVersionUID = 522000219325150443L;
@JsonProperty("splitter")
private final Character splitter;
@@ -221,7 +230,9 @@ public class HiveSinkInfo extends SinkInfo {
}
}
- public static class OrcFileFormat implements HiveFileFormat, Serializable {
+ public static class OrcFileFormat implements HiveFileFormat {
+
+ private static final long serialVersionUID = -6483139337919483030L;
@JsonProperty("batch_size")
private final int batchSize;
@@ -237,7 +248,9 @@ public class HiveSinkInfo extends SinkInfo {
}
}
- public static class SequenceFileFormat implements HiveFileFormat, Serializable {
+ public static class SequenceFileFormat implements HiveFileFormat {
+
+ private static final long serialVersionUID = 263836241053911625L;
@JsonProperty("splitter")
private final Character splitter;
@@ -263,9 +276,12 @@ public class HiveSinkInfo extends SinkInfo {
}
}
- public static class ParquetFileFormat implements HiveFileFormat, Serializable {
+ public static class ParquetFileFormat implements HiveFileFormat {
+
+ private static final long serialVersionUID = 3400568099604670179L;
public ParquetFileFormat() {
}
+
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
index 5ab6c71..a7d4ff8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
@@ -25,6 +25,8 @@ import org.apache.inlong.sort.protocol.FieldInfo;
public class IcebergSinkInfo extends SinkInfo {
+ private static final long serialVersionUID = 3682587731889008343L;
+
@JsonProperty("table_location")
private final String tableLocation;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfoTest.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfoTest.java
index 5ab6c71..6002786 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfoTest.java
@@ -16,28 +16,19 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
-public class IcebergSinkInfo extends SinkInfo {
+public class AvroDeserializationInfoTest extends ProtocolBaseTest {
- @JsonProperty("table_location")
- private final String tableLocation;
-
- @JsonCreator
- public IcebergSinkInfo(
- @JsonProperty("fields") FieldInfo[] fields,
- @JsonProperty("table_location") String tableLocation) {
- super(fields);
- this.tableLocation = Preconditions.checkNotNull(tableLocation);
+ @Override
+ public void init() {
+ expectedObject = new AvroDeserializationInfo();
+ expectedJson = "{\"type\":\"avro\"}";
+ equalObj1 = expectedObject;
+ equalObj2 = new AvroDeserializationInfo();
+ unequalObj = "";
}
- @JsonProperty("table_location")
- public String getTableLocation() {
- return tableLocation;
- }
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfoTest.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfoTest.java
index 5ab6c71..613d561 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfoTest.java
@@ -16,28 +16,24 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
-public class IcebergSinkInfo extends SinkInfo {
+public class CanalDeserializationInfoTest extends ProtocolBaseTest {
- @JsonProperty("table_location")
- private final String tableLocation;
-
- @JsonCreator
- public IcebergSinkInfo(
- @JsonProperty("fields") FieldInfo[] fields,
- @JsonProperty("table_location") String tableLocation) {
- super(fields);
- this.tableLocation = Preconditions.checkNotNull(tableLocation);
+ @Override
+ public void init() {
+ expectedObject = new CanalDeserializationInfo(null, null, false, "SQL", true);
+ expectedJson = "{\n"
+ + " \"type\" : \"canal\",\n"
+ + " \"ignore_parse_errors\" : false,\n"
+ + " \"timestamp_format_standard\" : \"SQL\",\n"
+ + " \"include_metadata\" : true\n"
+ + "}";
+ equalObj1 = expectedObject;
+ equalObj2 = new CanalDeserializationInfo(null, null, false, "SQL", true);
+ unequalObj = new CanalDeserializationInfo(null, null, false, "SQL", false);
}
- @JsonProperty("table_location")
- public String getTableLocation() {
- return tableLocation;
- }
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfoTest.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfoTest.java
index 5ab6c71..1c13903 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfoTest.java
@@ -16,28 +16,19 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
-public class IcebergSinkInfo extends SinkInfo {
+public class JsonDeserializationInfoTest extends ProtocolBaseTest {
- @JsonProperty("table_location")
- private final String tableLocation;
-
- @JsonCreator
- public IcebergSinkInfo(
- @JsonProperty("fields") FieldInfo[] fields,
- @JsonProperty("table_location") String tableLocation) {
- super(fields);
- this.tableLocation = Preconditions.checkNotNull(tableLocation);
+ @Override
+ public void init() {
+ expectedObject = new JsonDeserializationInfo();
+ expectedJson = "{\"type\":\"json\"}";
+ equalObj1 = expectedObject;
+ equalObj2 = new JsonDeserializationInfo();
+ unequalObj = "";
}
- @JsonProperty("table_location")
- public String getTableLocation() {
- return tableLocation;
- }
}
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index 40de211..f53121b 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.NullType;
@@ -77,6 +78,8 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.FormatUtils;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampTypeInfo;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.LongTypeInfo;
import org.apache.inlong.sort.formats.common.MapFormatInfo;
@@ -276,6 +279,8 @@ public class TableFormatUtils {
return new TimeFormatInfo();
} else if (logicalType instanceof TimestampType) {
return new TimestampFormatInfo();
+ } else if (logicalType instanceof LocalZonedTimestampType) {
+ return new LocalZonedTimestampFormatInfo();
} else if (logicalType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) logicalType;
LogicalType elementType = arrayType.getElementType();
@@ -344,6 +349,8 @@ public class TableFormatUtils {
return new DateType();
} else if (formatInfo instanceof TimestampFormatInfo) {
return new TimestampType(DEFAULT_PRECISION_FOR_TIMESTAMP);
+ } else if (formatInfo instanceof LocalZonedTimestampFormatInfo) {
+ return new LocalZonedTimestampType();
} else if (formatInfo instanceof ArrayFormatInfo) {
FormatInfo elementFormatInfo = ((ArrayFormatInfo) formatInfo).getElementFormatInfo();
return new ArrayType(deriveLogicalType(elementFormatInfo));
@@ -402,6 +409,8 @@ public class TableFormatUtils {
return Types.SQL_TIME;
} else if (typeInfo instanceof TimestampTypeInfo) {
return Types.SQL_TIMESTAMP;
+ } else if (typeInfo instanceof LocalZonedTimestampTypeInfo) {
+ return Types.LOCAL_DATE_TIME;
} else if (typeInfo instanceof BinaryTypeInfo) {
return Types.PRIMITIVE_ARRAY(Types.BYTE);
} else if (typeInfo instanceof ArrayTypeInfo) {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/Constants.java
similarity index 55%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/Constants.java
index fafae28..d196ba6 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/Constants.java
@@ -15,28 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.formats.common;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+public class Constants {
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
-
- private final char splitter;
-
- // TODO: support mapping index to field
+ public static final String DATE_AND_TIME_STANDARD_SQL = "SQL";
- @JsonCreator
- public CsvDeserializationInfo(
- @JsonProperty("splitter") char splitter) {
- this.splitter = splitter;
- }
+ public static final String DATE_AND_TIME_STANDARD_ISO_8601 = "ISO_8601";
- @JsonProperty("splitter")
- public char getSplitter() {
- return splitter;
- }
}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java
index 7bb18cf..7b3488f 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java
@@ -29,6 +29,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
+
/**
* The format information for {@link Date}s.
*/
@@ -53,8 +56,10 @@ public class DateFormatInfo implements BasicFormatInfo<Date> {
this.format = format;
if (!format.equals("SECONDS")
- && !format.equals("MILLIS")
- && !format.equals("MICROS")) {
+ && !format.equals("MILLIS")
+ && !format.equals("MICROS")
+ && !DATE_AND_TIME_STANDARD_SQL.equals(format)
+ && !DATE_AND_TIME_STANDARD_ISO_8601.equals(format)) {
this.simpleDateFormat = new SimpleDateFormat(format);
} else {
this.simpleDateFormat = null;
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index 79b8411..4272fa4 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -44,7 +44,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes.Type(name = "map", value = MapFormatInfo.class),
@JsonSubTypes.Type(name = "row", value = RowFormatInfo.class),
@JsonSubTypes.Type(name = "binary", value = BinaryFormatInfo.class),
- @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class)
+ @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class),
+ @JsonSubTypes.Type(name = "local_zoned_timestamp", value = LocalZonedTimestampFormatInfo.class)
})
public interface FormatInfo extends Serializable {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
similarity index 51%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
copy to inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
index 42abe16..9afc8da 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
@@ -15,36 +15,31 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol;
-
-import com.google.common.base.Preconditions;
-import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+package org.apache.inlong.sort.formats.common;
import java.util.Objects;
-public class FieldInfo {
- @JsonProperty("name")
- private final String name;
+public class LocalZonedTimestampFormatInfo implements FormatInfo {
+
+ private static final long serialVersionUID = -7501810151856898046L;
- @JsonProperty("format_info")
- private final FormatInfo formatInfo;
+ private final String format;
- @JsonCreator
- public FieldInfo(
- @JsonProperty("name") String name,
- @JsonProperty("format_info") FormatInfo formatInfo) {
- this.name = Preconditions.checkNotNull(name);
- this.formatInfo = Preconditions.checkNotNull(formatInfo);
+ public LocalZonedTimestampFormatInfo(String format) {
+ this.format = format;
}
- public String getName() {
- return name;
+ public LocalZonedTimestampFormatInfo() {
+ this("yyyy-MM-dd HH:mm:ss");
}
- public FormatInfo getFormatInfo() {
- return formatInfo;
+ public String getFormat() {
+ return format;
+ }
+
+ @Override
+ public TypeInfo getTypeInfo() {
+ return LocalZonedTimestampTypeInfo.INSTANCE;
}
@Override
@@ -55,12 +50,20 @@ public class FieldInfo {
if (o == null || getClass() != o.getClass()) {
return false;
}
- FieldInfo fieldInfo = (FieldInfo) o;
- return name.equals(fieldInfo.name) && formatInfo.equals(fieldInfo.formatInfo);
+ LocalZonedTimestampFormatInfo that = (LocalZonedTimestampFormatInfo) o;
+ return Objects.equals(format, that.format);
}
@Override
public int hashCode() {
- return Objects.hash(name, formatInfo);
+ return Objects.hash(format);
}
+
+ @Override
+ public String toString() {
+ return "LocalZonedTimestampFormatInfo{"
+ + "format='" + format + '\''
+ + '}';
+ }
+
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampTypeInfo.java
similarity index 56%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampTypeInfo.java
index fafae28..cdc66f8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampTypeInfo.java
@@ -15,28 +15,31 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.formats.common;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+public class LocalZonedTimestampTypeInfo implements TypeInfo {
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
+ private static final long serialVersionUID = -837081640968386059L;
+
+ public static final LocalZonedTimestampTypeInfo INSTANCE = new LocalZonedTimestampTypeInfo();
- private final char splitter;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
- // TODO: support mapping index to field
+ return o != null && getClass() == o.getClass();
+ }
- @JsonCreator
- public CsvDeserializationInfo(
- @JsonProperty("splitter") char splitter) {
- this.splitter = splitter;
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
}
- @JsonProperty("splitter")
- public char getSplitter() {
- return splitter;
+ @Override
+ public String toString() {
+ return "LocalZonedTimestampTypeInfo";
}
+
}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java
index f991b32..cd09759 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java
@@ -30,6 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
+
/**
* The format information for {@link Time}s.
*/
@@ -54,8 +57,10 @@ public class TimeFormatInfo implements BasicFormatInfo<Time> {
this.format = format;
if (!format.equals("MICROS")
- && !format.equals("MILLIS")
- && !format.equals("SECONDS")) {
+ && !format.equals("MILLIS")
+ && !format.equals("SECONDS")
+ && !DATE_AND_TIME_STANDARD_SQL.equals(format)
+ && !DATE_AND_TIME_STANDARD_ISO_8601.equals(format)) {
this.simpleDateFormat = new SimpleDateFormat(format);
} else {
this.simpleDateFormat = null;
@@ -63,7 +68,7 @@ public class TimeFormatInfo implements BasicFormatInfo<Time> {
}
public TimeFormatInfo() {
- this("hh:mm:ss");
+ this("HH:mm:ss");
}
@Nonnull
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
index ac7c307..d26373c 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
@@ -30,6 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
+
/**
* The format information for {@link Timestamp}s.
*/
@@ -55,7 +58,9 @@ public class TimestampFormatInfo implements BasicFormatInfo<Timestamp> {
if (!format.equals("MICROS")
&& !format.equals("MILLIS")
- && !format.equals("SECONDS")) {
+ && !format.equals("SECONDS")
+ && !DATE_AND_TIME_STANDARD_SQL.equals(format)
+ && !DATE_AND_TIME_STANDARD_ISO_8601.equals(format)) {
this.simpleDateFormat = new SimpleDateFormat(format);
} else {
this.simpleDateFormat = null;
@@ -63,7 +68,7 @@ public class TimestampFormatInfo implements BasicFormatInfo<Timestamp> {
}
public TimestampFormatInfo() {
- this("yyyy-MM-dd hh:mm:ss");
+ this("yyyy-MM-dd HH:mm:ss");
}
@Nonnull
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
index 626f620..2eaef3c 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
@@ -43,7 +43,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes.Type(name = "map", value = MapTypeInfo.class),
@JsonSubTypes.Type(name = "row", value = RowTypeInfo.class),
@JsonSubTypes.Type(name = "binary", value = BinaryTypeInfo.class),
- @JsonSubTypes.Type(name = "null", value = NullTypeInfo.class)
+ @JsonSubTypes.Type(name = "null", value = NullTypeInfo.class),
+ @JsonSubTypes.Type(name = "local_zoned_timestamp", value = LocalZonedTimestampTypeInfo.class)
})
public interface TypeInfo extends Serializable {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfoTest.java
similarity index 56%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfoTest.java
index fafae28..4389b03 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfoTest.java
@@ -15,28 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.formats.common;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collection;
+import java.util.Collections;
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
-
- private final char splitter;
+public class LocalZonedTimestampFormatInfoTest extends FormatInfoTestBase {
- // TODO: support mapping index to field
-
- @JsonCreator
- public CsvDeserializationInfo(
- @JsonProperty("splitter") char splitter) {
- this.splitter = splitter;
+ @Override
+ Collection<FormatInfo> createFormatInfos() {
+ return Collections.singleton(new LocalZonedTimestampFormatInfo());
}
- @JsonProperty("splitter")
- public char getSplitter() {
- return splitter;
- }
}
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index cb8e925..200e03e 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -120,6 +120,16 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-common</artifactId>
+ </dependency>
+
<!-- test -->
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 330cf72..e08900c 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -18,12 +18,16 @@
package org.apache.inlong.sort.singletenant.flink;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSinkStream;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuilder.buildPulsarSource;
-import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Map;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,6 +47,9 @@ import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
import org.apache.inlong.sort.singletenant.flink.clickhouse.ClickhouseRowSinkFunction;
+import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationFunction;
+import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
import org.apache.inlong.sort.util.ParameterTool;
@@ -62,14 +69,18 @@ public class Entrance {
env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- DataStream<Row> sourceStream = buildSourceStream(
+ DataStream<SerializedRecord> sourceStream = buildSourceStream(
env,
config,
- dataFlowInfo.getSourceInfo()
+ dataFlowInfo.getSourceInfo(),
+ dataFlowInfo.getProperties()
);
+ DataStream<Row> deserializedStream =
+ buildDeserializationStream(sourceStream, dataFlowInfo.getSourceInfo(), config);
+
buildSinkStream(
- sourceStream,
+ deserializedStream,
config,
dataFlowInfo.getSinkInfo(),
dataFlowInfo.getProperties(),
@@ -83,24 +94,40 @@ public class Entrance {
return objectMapper.readValue(new File(fileName), DataFlowInfo.class);
}
- private static DataStream<Row> buildSourceStream(
+ private static DataStream<SerializedRecord> buildSourceStream(
StreamExecutionEnvironment env,
Configuration config,
- SourceInfo sourceInfo) {
+ SourceInfo sourceInfo,
+ Map<String, Object> properties) {
final String sourceType = checkNotNull(config.getString(Constants.SOURCE_TYPE));
final int sourceParallelism = config.getInteger(Constants.SOURCE_PARALLELISM);
- DataStream<Row> sourceStream = null;
if (sourceType.equals(Constants.SOURCE_TYPE_PULSAR)) {
- Preconditions.checkState(sourceInfo instanceof PulsarSourceInfo);
+ checkState(sourceInfo instanceof PulsarSourceInfo);
PulsarSourceInfo pulsarSourceInfo = (PulsarSourceInfo) sourceInfo;
- // TODO : implement pulsar source function
+ return env.addSource(buildPulsarSource(pulsarSourceInfo, config, properties))
+ .uid(Constants.SOURCE_UID)
+ .name("Pulsar source")
+ .setParallelism(sourceParallelism)
+ .rebalance();
} else {
throw new IllegalArgumentException("Unsupported source type " + sourceType);
}
+ }
- return sourceStream;
+ private static DataStream<Row> buildDeserializationStream(
+ DataStream<SerializedRecord> sourceStream,
+ SourceInfo sourceInfo,
+ Configuration config
+ ) throws IOException, ClassNotFoundException {
+ DeserializationSchema<Row> schema =
+ DeserializationSchemaFactory.build(sourceInfo.getFields(), sourceInfo.getDeserializationInfo());
+ DeserializationFunction function = new DeserializationFunction(schema);
+ return sourceStream.process(function)
+ .uid(Constants.DESERIALIZATION_SCHEMA_UID)
+ .name("Deserialization")
+ .setParallelism(config.getInteger(Constants.DESERIALIZATION_PARALLELISM));
}
private static void buildSinkStream(
@@ -108,14 +135,14 @@ public class Entrance {
Configuration config,
SinkInfo sinkInfo,
Map<String, Object> properties,
- long dataflowId) {
+ long dataflowId) throws IOException, ClassNotFoundException {
final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
final int sinkParallelism = config.getInteger(Constants.SINK_PARALLELISM);
// TODO : implement sink functions below
switch (sinkType) {
case Constants.SINK_TYPE_CLICKHOUSE:
- Preconditions.checkState(sinkInfo instanceof ClickHouseSinkInfo);
+ checkState(sinkInfo instanceof ClickHouseSinkInfo);
ClickHouseSinkInfo clickHouseSinkInfo = (ClickHouseSinkInfo) sinkInfo;
sourceStream.addSink(new ClickhouseRowSinkFunction(clickHouseSinkInfo))
@@ -124,7 +151,7 @@ public class Entrance {
.setParallelism(sinkParallelism);
break;
case Constants.SINK_TYPE_HIVE:
- Preconditions.checkState(sinkInfo instanceof HiveSinkInfo);
+ checkState(sinkInfo instanceof HiveSinkInfo);
HiveSinkInfo hiveSinkInfo = (HiveSinkInfo) sinkInfo;
if (hiveSinkInfo.getPartitions().length == 0) {
@@ -147,19 +174,26 @@ public class Entrance {
break;
case Constants.SINK_TYPE_ICEBERG:
- Preconditions.checkState(sinkInfo instanceof IcebergSinkInfo);
+ checkState(sinkInfo instanceof IcebergSinkInfo);
IcebergSinkInfo icebergSinkInfo = (IcebergSinkInfo) sinkInfo;
TableLoader tableLoader = TableLoader.fromHadoopTable(
icebergSinkInfo.getTableLocation(),
new org.apache.hadoop.conf.Configuration());
- FlinkSink.forRow(sourceStream, CommonUtils.getTableSchema(sinkInfo))
+ FlinkSink.forRow(sourceStream, CommonUtils.getTableSchema(sinkInfo.getFields()))
.tableLoader(tableLoader)
.writeParallelism(sinkParallelism)
.build();
break;
case Constants.SINK_TYPE_KAFKA:
- buildKafkaSinkStream(sourceStream, (KafkaSinkInfo) sinkInfo, properties, config, sinkParallelism);
+ checkState(sinkInfo instanceof KafkaSinkInfo);
+ SerializationSchema<Row> schema = SerializationSchemaFactory.build(sinkInfo.getFields(),
+ ((KafkaSinkInfo) sinkInfo).getSerializationInfo());
+ sourceStream
+ .addSink(buildKafkaSink((KafkaSinkInfo) sinkInfo, properties, schema, config))
+ .uid(Constants.SINK_UID)
+ .name("Kafka Sink")
+ .setParallelism(sinkParallelism);
break;
default:
throw new IllegalArgumentException("Unsupported sink type " + sinkType);
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/SerializedRecord.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/SerializedRecord.java
new file mode 100644
index 0000000..057c8e6
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/SerializedRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink;
+
+import java.io.Serializable;
+
+public class SerializedRecord implements Serializable {
+
+ private static final long serialVersionUID = 5418156417016358730L;
+
+ // Event time
+ private long timestampMillis;
+
+ private byte[] data;
+
+ /**
+ * Just satisfy requirement of Flink Pojo definition.
+ */
+ public SerializedRecord() {
+
+ }
+
+ public SerializedRecord(long timestampMillis, byte[] data) {
+ this.timestampMillis = timestampMillis;
+ this.data = data;
+ }
+
+ public void setTimestampMillis(long timestampMillis) {
+ this.timestampMillis = timestampMillis;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public long getTimestampMillis() {
+ return timestampMillis;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
index da37cda..812bdec 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
@@ -29,6 +29,8 @@ import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
public class ClickhouseRowSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+ private static final long serialVersionUID = -2043675961013551232L;
+
private final ClickHouseSinkFunction clickHouseSinkFunction;
public ClickhouseRowSinkFunction(ClickHouseSinkInfo clickHouseSinkInfo) {
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
new file mode 100644
index 0000000..7ca93a5
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.canal.CanalJsonDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
+
+public class CanalDeserializationSchemaBuilder {
+
+ private static final List<String> ALL_SUPPORTED_METADATA_KEYS =
+ Arrays.asList("database", "table", "sql-type", "pk-names", "ingestion-timestamp", "event-timestamp");
+
+ public static DeserializationSchema<Row> build(
+ FieldInfo[] fieldInfos,
+ CanalDeserializationInfo deserializationInfo
+ ) throws IOException, ClassNotFoundException {
+ String timestampFormatStandard = deserializationInfo.getTimestampFormatStandard();
+ boolean includeMetadata = deserializationInfo.isIncludeMetadata();
+ CanalJsonDecodingFormat canalJsonDecodingFormat = createCanalJsonDecodingFormat(
+ deserializationInfo.getDatabase(),
+ deserializationInfo.getTable(),
+ deserializationInfo.isIgnoreParseErrors(),
+ timestampFormatStandard,
+ includeMetadata
+ );
+
+ FieldInfo[] convertedInputFields = convertDateToStringFormatInfo(fieldInfos);
+ DeserializationSchema<RowData> canalSchema = canalJsonDecodingFormat.createRuntimeDecoder(
+ new DynamicTableSource.Context() {
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType dataType) {
+ validateInputDataType(dataType);
+ return InternalTypeInfo.of(dataType.getLogicalType());
+ }
+
+ @Override
+ public DynamicTableSource.DataStructureConverter createDataStructureConverter(DataType dataType) {
+ return null;
+ }
+ },
+ convertFieldInfosToDataType(convertedInputFields)
+ );
+
+ return wrapCanalDeserializationSchema(canalSchema, includeMetadata, fieldInfos, timestampFormatStandard);
+ }
+
+ private static DeserializationSchema<Row> wrapCanalDeserializationSchema(
+ DeserializationSchema<RowData> canalSchema,
+ boolean includeMetadata,
+ FieldInfo[] origFieldInfos,
+ String timestampFormatStandard
+ ) throws IOException, ClassNotFoundException {
+ FieldInfo[] allFields;
+ if (includeMetadata) {
+ allFields = new FieldInfo[origFieldInfos.length + ALL_SUPPORTED_METADATA_KEYS.size()];
+ System.arraycopy(origFieldInfos, 0, allFields, 0, origFieldInfos.length);
+ FieldInfo[] metadataFields = buildMetadataFields(timestampFormatStandard);
+ System.arraycopy(metadataFields, 0, allFields, origFieldInfos.length, metadataFields.length);
+ } else {
+ allFields = origFieldInfos;
+ }
+
+ FieldInfo[] convertedAllFields = convertDateToStringFormatInfo(allFields);
+ RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema =
+ new RowDataToRowDeserializationSchemaWrapper(canalSchema, convertedAllFields);
+ return new CustomDateFormatDeserializationSchemaWrapper(rowDataToRowSchema, extractFormatInfos(allFields));
+ }
+
+ private static CanalJsonDecodingFormat createCanalJsonDecodingFormat(
+ String database,
+ String table,
+ boolean ignoreParseErrors,
+ String timestampFormatStandard,
+ boolean includeMetadata
+ ) {
+ TimestampFormat timestampFormat = getTimestampFormatStandard(timestampFormatStandard);
+ CanalJsonDecodingFormat canalJsonDecodingFormat =
+ new CanalJsonDecodingFormat(database, table, ignoreParseErrors, timestampFormat);
+ if (includeMetadata) {
+ canalJsonDecodingFormat.applyReadableMetadata(ALL_SUPPORTED_METADATA_KEYS);
+ }
+
+ return canalJsonDecodingFormat;
+ }
+
+ private static FieldInfo[] buildMetadataFields(String timestampStandard) {
+ return new FieldInfo[]{
+ new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(0), StringFormatInfo.INSTANCE),
+ new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(1), StringFormatInfo.INSTANCE),
+ new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(2),
+ new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)),
+ new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(3), new ArrayFormatInfo(StringFormatInfo.INSTANCE)),
+ new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(4), new LocalZonedTimestampFormatInfo(timestampStandard)),
+ new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(5), new LocalZonedTimestampFormatInfo(timestampStandard))
+ };
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..c3cc687
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.isStandardTimestampFormat;
+
+public class CustomDateFormatDeserializationSchemaWrapper implements DeserializationSchema<Row> {
+
+ private static final long serialVersionUID = -8124501384364175884L;
+
+ private final DeserializationSchema<Row> innerSchema;
+
+ private final FormatInfo[] formatInfos;
+
+ public CustomDateFormatDeserializationSchemaWrapper(
+ DeserializationSchema<Row> innerSchema, FormatInfo[] formatInfos) {
+ this.innerSchema = checkNotNull(innerSchema);
+ this.formatInfos = checkNotNull(formatInfos);
+ }
+
+ @Override
+ public Row deserialize(byte[] message) {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<Row> out) throws IOException {
+ ListCollector<Row> collector = new ListCollector<>();
+ innerSchema.deserialize(message, collector);
+ for (Row row : collector.getInnerList()) {
+ try {
+ out.collect(fromStringToDateAndTime(row));
+ } catch (ParseException e) {
+ throw new IOException("Failed to parse input date or time, error is " + e);
+ }
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(Row nextElement) {
+ return innerSchema.isEndOfStream(nextElement);
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return innerSchema.getProducedType();
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ innerSchema.open(context);
+ }
+
+ @VisibleForTesting
+ Row fromStringToDateAndTime(Row inputRow) throws ParseException {
+ int arity = inputRow.getArity();
+ Row outputRow = new Row(arity);
+ for (int i = 0; i < arity; i++) {
+ outputRow.setField(i, convert(inputRow.getField(i), formatInfos[i]));
+ }
+ outputRow.setKind(inputRow.getKind());
+ return outputRow;
+ }
+
+ private Object convert(Object input, FormatInfo formatInfo) throws ParseException {
+
+ if (formatInfo instanceof DateFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+ checkState(input instanceof String);
+ java.util.Date date =
+ FastDateFormat.getInstance(((DateFormatInfo) formatInfo).getFormat()).parse((String) input);
+ return new Date(date.getTime());
+
+ } else if (formatInfo instanceof TimeFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+ checkState(input instanceof String);
+ java.util.Date date =
+ FastDateFormat.getInstance(((TimeFormatInfo) formatInfo).getFormat()).parse((String) input);
+ return new Time(date.getTime());
+
+ } else if (formatInfo instanceof TimestampFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+ checkState(input instanceof String);
+ java.util.Date date =
+ FastDateFormat.getInstance(((TimestampFormatInfo) formatInfo).getFormat()).parse((String) input);
+ return new Timestamp(date.getTime());
+
+ } else {
+ return input;
+ }
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
new file mode 100644
index 0000000..e8bfe2f
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.commons.msg.InLongMsg;
+import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
+
+import java.util.Iterator;
+
+public class DeserializationFunction extends ProcessFunction<SerializedRecord, Row> {
+
+ private static final long serialVersionUID = 6245991845787657154L;
+
+ private final DeserializationSchema<Row> deserializationSchema;
+
+ public DeserializationFunction(
+ DeserializationSchema<Row> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public void processElement(
+ SerializedRecord value,
+ ProcessFunction<SerializedRecord, Row>.Context ctx,
+ Collector<Row> out
+ ) throws Exception {
+ InLongMsg inLongMsg = InLongMsg.parseFrom(value.getData());
+
+ for (String attr : inLongMsg.getAttrs()) {
+ Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
+ if (iterator == null) {
+ continue;
+ }
+
+ while (iterator.hasNext()) {
+
+ byte[] bodyBytes = iterator.next();
+ if (bodyBytes == null || bodyBytes.length == 0) {
+ continue;
+ }
+
+ deserializationSchema.deserialize(bodyBytes, out);
+ }
+ }
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
new file mode 100644
index 0000000..12f8dcb
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
+import org.apache.flink.formats.json.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+
+public class DeserializationSchemaFactory {
+
+ public static DeserializationSchema<Row> build(
+ FieldInfo[] fieldInfos,
+ DeserializationInfo deserializationInfo) throws IOException, ClassNotFoundException {
+ if (deserializationInfo instanceof JsonDeserializationInfo) {
+ return buildJsonDeserializationSchema(fieldInfos);
+ } else if (deserializationInfo instanceof AvroDeserializationInfo) {
+ return buildAvroDeserializationSchema(fieldInfos);
+ } else if (deserializationInfo instanceof CanalDeserializationInfo) {
+ return CanalDeserializationSchemaBuilder.build(fieldInfos, (CanalDeserializationInfo) deserializationInfo);
+ } else {
+ return buildStringDeserializationSchema(fieldInfos);
+ }
+ }
+
+ private static DeserializationSchema<Row> buildJsonDeserializationSchema(
+ FieldInfo[] fieldInfos) throws IOException, ClassNotFoundException {
+ FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
+ RowTypeInfo rowTypeInfo = convertFieldInfosToRowTypeInfo(convertedFieldInfos);
+ JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder(rowTypeInfo).build();
+ return new CustomDateFormatDeserializationSchemaWrapper(jsonSchema, extractFormatInfos(fieldInfos));
+ }
+
+ private static DeserializationSchema<Row> buildAvroDeserializationSchema(FieldInfo[] fieldInfos) {
+ String avroSchemaInJson = buildAvroRecordSchemaInJson(fieldInfos);
+ return new AvroRowDeserializationSchema(avroSchemaInJson);
+ }
+
+ private static DeserializationSchema<Row> buildStringDeserializationSchema(FieldInfo[] fieldInfos) {
+ return new DeserializationSchema<Row>() {
+
+ private static final long serialVersionUID = 9206114128358065420L;
+
+ private final RowTypeInfo rowTypeInfo = convertFieldInfosToRowTypeInfo(fieldInfos);
+
+ @Override
+ public Row deserialize(byte[] message) {
+ Row row = new Row(1);
+ row.setField(0, new String(message, StandardCharsets.UTF_8));
+ return row;
+ }
+
+ @Override
+ public boolean isEndOfStream(Row nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return rowTypeInfo;
+ }
+ };
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/ListCollector.java
similarity index 56%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/ListCollector.java
index fafae28..4df161f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/ListCollector.java
@@ -15,28 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.singletenant.flink.deserialization;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.util.Collector;
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
+import java.util.ArrayList;
+import java.util.List;
+
+public class ListCollector<T> implements Collector<T> {
- private final char splitter;
+ private List<T> innerList = new ArrayList<>();
- // TODO: support mapping index to field
+ @Override
+ public void collect(T record) {
+ innerList.add(record);
+ }
- @JsonCreator
- public CsvDeserializationInfo(
- @JsonProperty("splitter") char splitter) {
- this.splitter = splitter;
+ @Override
+ public void close() {
+ innerList = null;
}
- @JsonProperty("splitter")
- public char getSplitter() {
- return splitter;
+ public List<T> getInnerList() {
+ return innerList;
}
+
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/RowDataToRowDeserializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/RowDataToRowDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..18749e4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/RowDataToRowDeserializationSchemaWrapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import java.io.IOException;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
+
+public class RowDataToRowDeserializationSchemaWrapper implements DeserializationSchema<Row> {
+
+ private static final long serialVersionUID = 6918677263991851770L;
+
+ private final DeserializationSchema<RowData> innerSchema;
+
+ private final DataFormatConverters.RowConverter rowConverter;
+
+ private final TypeInformation<Row> producedTypeInfo;
+
+ public RowDataToRowDeserializationSchemaWrapper(
+ DeserializationSchema<RowData> innerSchema, FieldInfo[] fieldInfos) {
+ this.innerSchema = checkNotNull(innerSchema);
+ rowConverter = createRowConverter(checkNotNull(fieldInfos));
+ producedTypeInfo = convertFieldInfosToRowTypeInfo(fieldInfos);
+ }
+
+ @Override
+ public Row deserialize(byte[] message) {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<Row> out) throws IOException {
+ ListCollector<RowData> collector = new ListCollector<>();
+ innerSchema.deserialize(message, collector);
+ collector.getInnerList().forEach(record -> out.collect(rowConverter.toExternal(record)));
+ }
+
+ @Override
+ public boolean isEndOfStream(Row nextElement) {
+ return innerSchema.isEndOfStream(rowConverter.toInternal(nextElement));
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ innerSchema.open(context);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
index 798a69f..298e052 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
@@ -18,34 +18,25 @@
package org.apache.inlong.sort.singletenant.flink.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.configuration.Constants;
-import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Map;
import java.util.Properties;
import static org.apache.inlong.sort.configuration.Constants.SINK_KAFKA_PRODUCER_POOL_SIZE;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
public class KafkaSinkBuilder {
- public static <T> SinkFunction<T> buildKafkaSink(
+ public static SinkFunction<Row> buildKafkaSink(
KafkaSinkInfo kafkaSinkInfo,
Map<String, Object> properties,
- SerializationSchema<T> serializationSchema,
+ SerializationSchema<Row> schema,
Configuration config
) {
String topic = kafkaSinkInfo.getTopic();
@@ -53,7 +44,7 @@ public class KafkaSinkBuilder {
return new FlinkKafkaProducer<>(
topic,
- serializationSchema,
+ schema,
producerProperties,
new FlinkFixedPartitioner<>(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
@@ -68,38 +59,4 @@ public class KafkaSinkBuilder {
return producerProperties;
}
- public static void buildKafkaSinkStream(
- DataStream<Row> sourceStream,
- KafkaSinkInfo kafkaSinkInfo,
- Map<String, Object> properties,
- Configuration config,
- int sinkParallelism
- ) {
- SerializationInfo serializationInfo = kafkaSinkInfo.getSerializationInfo();
- if (serializationInfo instanceof CanalSerializationInfo) {
- DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
- DataStream<RowData> dataStream = sourceStream
- .map(rowConverter::toInternal)
- .uid(Constants.CONVERTER_UID)
- .name("Row to RowData Converter")
- .setParallelism(sinkParallelism);
-
- SerializationSchema<RowData> schema = RowDataSerializationSchemaFactory.build(
- kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
-
- dataStream
- .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
- .uid(Constants.SINK_UID)
- .name("Kafka Sink")
- .setParallelism(sinkParallelism);
- } else {
- SerializationSchema<Row> schema = RowSerializationSchemaFactory.build(
- kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
- sourceStream
- .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
- .uid(Constants.SINK_UID)
- .name("Kafka Sink")
- .setParallelism(sinkParallelism);
- }
- }
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/pulsar/PulsarSourceBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/pulsar/PulsarSourceBuilder.java
new file mode 100644
index 0000000..3359c78
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/pulsar/PulsarSourceBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.pulsar;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.flink.pulsar.PulsarDeserializationSchema;
+import org.apache.inlong.sort.flink.pulsar.PulsarSourceFunction;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
+import org.apache.pulsar.client.api.Message;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class PulsarSourceBuilder {
+
+ public static PulsarSourceFunction<SerializedRecord> buildPulsarSource(
+ PulsarSourceInfo sourceInfo,
+ Configuration config,
+ Map<String, Object> properties
+ ) {
+
+ Map<String, String> configMap = config.toMap();
+ if (properties != null && !properties.isEmpty()) {
+ for (Map.Entry<String, Object> entry : properties.entrySet()) {
+ configMap.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+
+ org.apache.flink.configuration.Configuration flinkConfig =
+ org.apache.flink.configuration.Configuration.fromMap(configMap);
+
+ return new PulsarSourceFunction<>(
+ sourceInfo.getAdminUrl(),
+ sourceInfo.getServiceUrl(),
+ sourceInfo.getTopic(),
+ sourceInfo.getSubscriptionName(),
+ sourceInfo.getAuthentication(),
+ new PulsarDeserializationSchemaImpl(),
+ flinkConfig
+ );
+ }
+
+ public static class PulsarDeserializationSchemaImpl implements PulsarDeserializationSchema<SerializedRecord> {
+
+ private static final long serialVersionUID = -3642110610339179932L;
+
+ @Override
+ public DeserializationResult<SerializedRecord> deserialize(
+ @SuppressWarnings("rawtypes") Message message) throws IOException {
+ final byte[] data = message.getData();
+ return DeserializationResult.of(new SerializedRecord(message.getEventTime(), data), data.length);
+ }
+
+ @Override
+ public TypeInformation<SerializedRecord> getProducedType() {
+ return TypeInformation.of(SerializedRecord.class);
+ }
+
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
similarity index 66%
rename from inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
rename to inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
index 69d7378..c7ace10 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
@@ -19,62 +19,51 @@ package org.apache.inlong.sort.singletenant.flink.serialization;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.canal.CanalJsonSerializationSchema;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
+import java.io.IOException;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
-public class RowDataSerializationSchemaFactory {
+public class CanalSerializationSchemaBuilder {
- private static final String CANAL_TIMESTAMP_STANDARD_SQL = "SQL";
- private static final String CANAL_TIMESTAMP_STANDARD_ISO = "ISO_8601";
private static final String CANAL_MAP_NULL_KEY_MODE_FAIL = "FAIL";
private static final String CANAL_MAP_NULL_KEY_MODE_DROP = "DROP";
private static final String CANAL_MAP_NULL_KEY_MODE_LITERAL = "LITERAL";
private static final String CANAL_MAP_NULL_KEY_LITERAL_DEFAULT = "null";
- public static SerializationSchema<RowData> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
- if (serializationInfo instanceof CanalSerializationInfo) {
- return buildCanalRowDataSerializationSchema(fieldInfos, (CanalSerializationInfo) serializationInfo);
- }
-
- throw new IllegalArgumentException("Unsupported RowData serialization info: " + serializationInfo);
- }
-
- private static SerializationSchema<RowData> buildCanalRowDataSerializationSchema(
+ public static SerializationSchema<Row> build(
FieldInfo[] fieldInfos,
CanalSerializationInfo canalSerializationInfo
- ) {
+ ) throws IOException, ClassNotFoundException {
String mapNullKeyLiteral = canalSerializationInfo.getMapNullKeyLiteral();
if (StringUtils.isEmpty(mapNullKeyLiteral)) {
mapNullKeyLiteral = CANAL_MAP_NULL_KEY_LITERAL_DEFAULT;
}
- RowType rowType = convertFieldInfosToRowType(fieldInfos);
- return new CanalJsonSerializationSchema(
- rowType,
+ FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
+ RowType convertedRowType = convertFieldInfosToRowType(convertedFieldInfos);
+ CanalJsonSerializationSchema canalSchema = new CanalJsonSerializationSchema(
+ convertedRowType,
getTimestampFormatStandard(canalSerializationInfo.getTimestampFormatStandard()),
getMapNullKeyMode(canalSerializationInfo.getMapNullKeyMod()),
mapNullKeyLiteral,
canalSerializationInfo.isEncodeDecimalAsPlainNumber()
);
- }
- private static TimestampFormat getTimestampFormatStandard(String input) {
- if (CANAL_TIMESTAMP_STANDARD_SQL.equals(input)) {
- return TimestampFormat.SQL;
- } else if (CANAL_TIMESTAMP_STANDARD_ISO.equals(input)) {
- return TimestampFormat.ISO_8601;
- }
+ RowToRowDataSerializationSchemaWrapper rowToRowDataSchema
+ = new RowToRowDataSerializationSchemaWrapper(canalSchema, convertedFieldInfos);
- throw new IllegalArgumentException("Unsupported timestamp format standard: " + input);
+ return new CustomDateFormatSerializationSchemaWrapper(rowToRowDataSchema, extractFormatInfos(fieldInfos));
}
private static JsonOptions.MapNullKeyMode getMapNullKeyMode(String input) {
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
new file mode 100644
index 0000000..f271e39
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.isStandardTimestampFormat;
+
+public class CustomDateFormatSerializationSchemaWrapper implements SerializationSchema<Row> {
+
+ private static final long serialVersionUID = 7342088340154604198L;
+
+ private final SerializationSchema<Row> innerSchema;
+
+ private final FormatInfo[] formatInfos;
+
+ public CustomDateFormatSerializationSchemaWrapper(SerializationSchema<Row> innerSchema, FormatInfo[] formatInfos) {
+ this.innerSchema = checkNotNull(innerSchema);
+ this.formatInfos = checkNotNull(formatInfos);
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ innerSchema.open(context);
+ }
+
+ @Override
+ public byte[] serialize(Row element) {
+ Row outputRow = fromDateAndTimeToString(element);
+ return innerSchema.serialize(outputRow);
+ }
+
+ @VisibleForTesting
+ Row fromDateAndTimeToString(Row inputRow) {
+ int arity = inputRow.getArity();
+ Row outputRow = new Row(arity);
+ for (int i = 0; i < arity; i++) {
+ outputRow.setField(i, convert(inputRow.getField(i), formatInfos[i]));
+ }
+ outputRow.setKind(inputRow.getKind());
+ return outputRow;
+ }
+
+ private Object convert(Object input, FormatInfo formatInfo) {
+
+ if (formatInfo instanceof DateFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+ checkState(input instanceof Date);
+ return FastDateFormat.getInstance(((DateFormatInfo) formatInfo).getFormat()).format(input);
+
+ } else if (formatInfo instanceof TimeFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+ checkState(input instanceof Time);
+ return FastDateFormat.getInstance(((TimeFormatInfo) formatInfo).getFormat()).format(input);
+
+ } else if (formatInfo instanceof TimestampFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+ checkState(input instanceof Timestamp);
+ return FastDateFormat.getInstance(((TimestampFormatInfo) formatInfo).getFormat()).format(input);
+
+ } else {
+ return input;
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowToRowDataSerializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowToRowDataSerializationSchemaWrapper.java
new file mode 100644
index 0000000..83ea0e4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowToRowDataSerializationSchemaWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
+
+public class RowToRowDataSerializationSchemaWrapper implements SerializationSchema<Row> {
+
+ private static final long serialVersionUID = 2496100417131340005L;
+
+ private final SerializationSchema<RowData> innerSchema;
+
+ private final DataFormatConverters.RowConverter rowConverter;
+
+ public RowToRowDataSerializationSchemaWrapper(SerializationSchema<RowData> innerSchema, FieldInfo[] fieldInfos) {
+ this.innerSchema = checkNotNull(innerSchema);
+ this.rowConverter = createRowConverter(checkNotNull(fieldInfos));
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ innerSchema.open(context);
+ }
+
+ @Override
+ public byte[] serialize(Row element) {
+ return innerSchema.serialize(rowConverter.toInternal(element));
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/SerializationSchemaFactory.java
similarity index 55%
rename from inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
rename to inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/SerializationSchemaFactory.java
index b2eb961..afb3bb9 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/SerializationSchemaFactory.java
@@ -22,41 +22,55 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-public class RowSerializationSchemaFactory {
+public class SerializationSchemaFactory {
- public static SerializationSchema<Row> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
+ public static SerializationSchema<Row> build(
+ FieldInfo[] fieldInfos,
+ SerializationInfo serializationInfo
+ ) throws IOException, ClassNotFoundException {
if (serializationInfo instanceof JsonSerializationInfo) {
- return buildJsonRowSerializationSchema(fieldInfos);
+ return buildJsonSerializationSchema(fieldInfos);
} else if (serializationInfo instanceof AvroSerializationInfo) {
- return buildAvroRowSerializationSchema(fieldInfos);
+ return buildAvroSerializationSchema(fieldInfos);
+ } else if (serializationInfo instanceof CanalSerializationInfo) {
+ return CanalSerializationSchemaBuilder.build(fieldInfos, (CanalSerializationInfo) serializationInfo);
} else {
- return buildStringRowSerializationSchema();
+ return buildStringSerializationSchema(extractFormatInfos(fieldInfos));
}
}
- private static SerializationSchema<Row> buildJsonRowSerializationSchema(FieldInfo[] fieldInfos) {
- RowTypeInfo rowTypeInfo = convertFieldInfosToRowTypeInfo(fieldInfos);
+ private static SerializationSchema<Row> buildJsonSerializationSchema(
+ FieldInfo[] fieldInfos
+ ) throws IOException, ClassNotFoundException {
+ FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
+ RowTypeInfo convertedRowTypeInfo = convertFieldInfosToRowTypeInfo(convertedFieldInfos);
JsonRowSerializationSchema.Builder builder = JsonRowSerializationSchema.builder();
- return builder.withTypeInfo(rowTypeInfo).build();
+ JsonRowSerializationSchema innerSchema = builder.withTypeInfo(convertedRowTypeInfo).build();
+ return new CustomDateFormatSerializationSchemaWrapper(innerSchema, extractFormatInfos(fieldInfos));
}
- private static SerializationSchema<Row> buildAvroRowSerializationSchema(FieldInfo[] fieldInfos) {
+ private static SerializationSchema<Row> buildAvroSerializationSchema(FieldInfo[] fieldInfos) {
String avroSchemaInJson = buildAvroRecordSchemaInJson(fieldInfos);
return new AvroRowSerializationSchema(avroSchemaInJson);
}
- private static SerializationSchema<Row> buildStringRowSerializationSchema() {
- return new SerializationSchema<Row>() {
+ private static SerializationSchema<Row> buildStringSerializationSchema(FormatInfo[] formatInfos) {
+ SerializationSchema<Row> stringSchema = new SerializationSchema<Row>() {
private static final long serialVersionUID = -6818985955456373916L;
@@ -65,5 +79,8 @@ public class RowSerializationSchemaFactory {
return element.toString().getBytes(StandardCharsets.UTF_8);
}
};
+
+ return new CustomDateFormatSerializationSchemaWrapper(stringSchema, formatInfos);
}
+
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index 0ee5b7a..cd6f610 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.singletenant.flink.utils;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableSchema.Builder;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -27,20 +28,32 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.formats.common.TypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema;
+import static org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter.toDataType;
import static org.apache.inlong.sort.formats.base.TableFormatUtils.deriveLogicalType;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
public class CommonUtils {
- public static TableSchema getTableSchema(SinkInfo sinkInfo) {
+ public static TableSchema getTableSchema(FieldInfo[] fieldInfos) {
TableSchema.Builder builder = new Builder();
- FieldInfo[] fieldInfos = sinkInfo.getFields();
for (FieldInfo fieldInfo : fieldInfos) {
builder.field(
@@ -68,7 +81,7 @@ public class CommonUtils {
return new org.apache.flink.api.java.typeutils.RowTypeInfo(typeInformationArray, fieldNames);
}
- public static String buildAvroRecordSchemaInJson(FieldInfo[] fieldInfos) {
+ public static LogicalType convertFieldInfosToLogicalType(FieldInfo[] fieldInfos) {
int fieldLength = fieldInfos.length;
String[] fieldNames = new String[fieldLength];
FormatInfo[] fieldFormatInfos = new FormatInfo[fieldLength];
@@ -78,7 +91,11 @@ public class CommonUtils {
}
RowFormatInfo rowFormatInfo = new RowFormatInfo(fieldNames, fieldFormatInfos);
- LogicalType logicalType = deriveLogicalType(rowFormatInfo);
+ return deriveLogicalType(rowFormatInfo);
+ }
+
+ public static String buildAvroRecordSchemaInJson(FieldInfo[] fieldInfos) {
+ LogicalType logicalType = convertFieldInfosToLogicalType(fieldInfos);
Schema schema = convertToSchema(logicalType);
if (schema.isUnion()) {
@@ -87,8 +104,13 @@ public class CommonUtils {
return schema.toString();
}
- public static DataFormatConverters.RowConverter createRowConverter(SinkInfo sinkInfo) {
- DataType[] fieldDataTypes = getTableSchema(sinkInfo).getFieldDataTypes();
+ public static DataType convertFieldInfosToDataType(FieldInfo[] fieldInfos) {
+ LogicalType logicalType = convertFieldInfosToLogicalType(fieldInfos);
+ return toDataType(logicalType);
+ }
+
+ public static DataFormatConverters.RowConverter createRowConverter(FieldInfo[] fieldInfos) {
+ DataType[] fieldDataTypes = getTableSchema(fieldInfos).getFieldDataTypes();
return new DataFormatConverters.RowConverter(fieldDataTypes);
}
@@ -104,4 +126,71 @@ public class CommonUtils {
return RowType.of(fieldLogicalTypes, fieldNames);
}
+ public static TimestampFormat getTimestampFormatStandard(String input) {
+ if (DATE_AND_TIME_STANDARD_SQL.equals(input)) {
+ return TimestampFormat.SQL;
+ } else if (DATE_AND_TIME_STANDARD_ISO_8601.equals(input)) {
+ return TimestampFormat.ISO_8601;
+ }
+
+ throw new IllegalArgumentException("Unsupported timestamp format standard: " + input);
+ }
+
+ public static Object deepCopy(Serializable input) throws IOException, ClassNotFoundException {
+ byte[] bytes;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(input);
+ bytes = baos.toByteArray();
+ }
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return ois.readObject();
+ }
+ }
+
+ // TODO: support map and array
+ public static FieldInfo[] convertDateToStringFormatInfo(FieldInfo[] inputInfos)
+ throws IOException, ClassNotFoundException {
+ FieldInfo[] copiedInfos = (FieldInfo[]) deepCopy(inputInfos);
+ for (FieldInfo copiedInfo : copiedInfos) {
+ FormatInfo formatInfo = copiedInfo.getFormatInfo();
+ if (formatInfo instanceof DateFormatInfo
+ || formatInfo instanceof TimeFormatInfo
+ || formatInfo instanceof TimestampFormatInfo) {
+ if (!isStandardTimestampFormat(formatInfo)) {
+ copiedInfo.setFormatInfo(StringFormatInfo.INSTANCE);
+ }
+ }
+ }
+
+ return copiedInfos;
+ }
+
+ public static boolean isStandardTimestampFormat(FormatInfo formatInfo) {
+ if (formatInfo instanceof DateFormatInfo) {
+ String format = ((DateFormatInfo) formatInfo).getFormat();
+ return DATE_AND_TIME_STANDARD_SQL.equals(format) || DATE_AND_TIME_STANDARD_ISO_8601.equals(format);
+ } else if (formatInfo instanceof TimeFormatInfo) {
+ String format = ((TimeFormatInfo) formatInfo).getFormat();
+ return DATE_AND_TIME_STANDARD_SQL.equals(format) || DATE_AND_TIME_STANDARD_ISO_8601.equals(format);
+ } else if (formatInfo instanceof TimestampFormatInfo) {
+ String format = ((TimestampFormatInfo) formatInfo).getFormat();
+ return DATE_AND_TIME_STANDARD_SQL.equals(format) || DATE_AND_TIME_STANDARD_ISO_8601.equals(format);
+ }
+
+ return false;
+ }
+
+ public static FormatInfo[] extractFormatInfos(FieldInfo[] fieldInfos) {
+ int length = fieldInfos.length;
+ FormatInfo[] output = new FormatInfo[length];
+ for (int i = 0; i < length; i++) {
+ output[i] = fieldInfos[i].getFormatInfo();
+ }
+
+ return output;
+ }
+
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/AvroDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/AvroDeserializationTest.java
new file mode 100644
index 0000000..e4c3946
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/AvroDeserializationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class AvroDeserializationTest {
+
+ private final FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("id", LongFormatInfo.INSTANCE),
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("bytes", BinaryFormatInfo.INSTANCE),
+ new FieldInfo("date", new DateFormatInfo("yyyy-MM-dd")),
+ new FieldInfo("time", new TimeFormatInfo("HH:mm:ss")),
+ new FieldInfo("timestamp", new TimestampFormatInfo("yyyy-MM-dd HH:mm:ss")),
+ new FieldInfo("map",
+ new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
+ new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
+ new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
+ };
+
+ private Row generateTestRow() {
+ Row testRow = new Row(8);
+ testRow.setField(0, 1238123899121L);
+ testRow.setField(1, "testName");
+
+ byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
+ testRow.setField(2, bytes);
+
+ testRow.setField(3, Date.valueOf("1990-10-14"));
+ testRow.setField(4, Time.valueOf("12:12:43"));
+ testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+
+ Map<String, Long> map = new HashMap<>();
+ map.put("flink", 123L);
+ testRow.setField(6, map);
+
+ Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+ Map<String, Integer> innerMap = new HashMap<>();
+ innerMap.put("key", 234);
+ nestedMap.put("inner_map", innerMap);
+ testRow.setField(7, nestedMap);
+
+ return testRow;
+ }
+
+ @Test
+ public void testAvroDeserializationSchema() throws IOException, ClassNotFoundException {
+ SerializationSchema<Row> serializationSchema =
+ SerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
+
+ Row expectedRow = generateTestRow();
+ byte[] bytes = serializationSchema.serialize(expectedRow);
+
+ DeserializationSchema<Row> deserializationSchema =
+ DeserializationSchemaFactory.build(fieldInfos, new AvroDeserializationInfo());
+ Row actualRow = deserializationSchema.deserialize(bytes);
+
+ assertEquals(expectedRow, actualRow);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
new file mode 100644
index 0000000..b649ef0
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CanalDeserializationTest {
+
+ private final FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("id", LongFormatInfo.INSTANCE),
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("bytes", BinaryFormatInfo.INSTANCE),
+ new FieldInfo("date", new DateFormatInfo("yyyy-MM-dd")),
+ new FieldInfo("time", new TimeFormatInfo("HH:mm:ss")),
+ new FieldInfo("timestamp", new TimestampFormatInfo("yyyy-MM-dd HH:mm:ss")),
+ new FieldInfo("map",
+ new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
+ new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
+ new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
+ };
+
+ private Row generateTestRow() {
+ Row testRow = new Row(8);
+ testRow.setField(0, 1238123899121L);
+ testRow.setField(1, "testName");
+
+ byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
+ testRow.setField(2, bytes);
+
+ testRow.setField(3, Date.valueOf("1990-10-14"));
+ testRow.setField(4, Time.valueOf("12:12:43"));
+ testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+
+ Map<String, Long> map = new HashMap<>();
+ map.put("flink", 123L);
+ testRow.setField(6, map);
+
+ Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+ Map<String, Integer> innerMap = new HashMap<>();
+ innerMap.put("key", 234);
+ nestedMap.put("inner_map", innerMap);
+ testRow.setField(7, nestedMap);
+
+ return testRow;
+ }
+
+ @Test
+ public void testDeserializeStringWithoutMetadata() throws IOException, ClassNotFoundException {
+ String testString = "{\n"
+ + " \"data\":[\n"
+ + " {\n"
+ + " \"id\":1238123899121,\n"
+ + " \"name\":\"testName\",\n"
+ + " \"bytes\":\"AQIDBAUG\",\n"
+ + " \"date\":\"1990-10-14\",\n"
+ + " \"time\":\"12:12:43\",\n"
+ + " \"timestamp\":\"1990-10-14 12:12:43\",\n"
+ + " \"map\":{\n"
+ + " \"flink\":123\n"
+ + " },\n"
+ + " \"map2map\":{\n"
+ + " \"inner_map\":{\n"
+ + " \"key\":234\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " ],\n"
+ + " \"type\":\"INSERT\"\n"
+ + "}";
+ byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
+ DeserializationSchema<Row> schema = DeserializationSchemaFactory.build(
+ fieldInfos,
+ new CanalDeserializationInfo(null, null, false, "ISO_8601", false)
+ );
+ ListCollector<Row> collector = new ListCollector<>();
+ schema.deserialize(testBytes, collector);
+ assertEquals(generateTestRow(), collector.getInnerList().get(0));
+ }
+
+ @Test
+ public void testCanalDeserializationSchema() throws IOException, ClassNotFoundException {
+ String testCanalData = "{\n"
+ + " \"data\":[\n"
+ + " {\n"
+ + " \"id\":\"101\",\n"
+ + " \"name\":\"scooter\",\n"
+ + " \"description\":\"Small 2-wheel scooter\",\n"
+ + " \"weight\":\"3.14\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"id\":\"102\",\n"
+ + " \"name\":\"car battery\",\n"
+ + " \"description\":\"12V car battery\",\n"
+ + " \"weight\":\"8.1\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"id\":\"103\",\n"
+ + " \"name\":\"12-pack drill bits\",\n"
+ + " \"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\n"
+ + " \"weight\":\"0.8\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"database\":\"inventory\",\n"
+ + " \"es\":1589373515000,\n"
+ + " \"id\":3,\n"
+ + " \"isDdl\":false,\n"
+ + " \"mysqlType\":{\n"
+ + " \"id\":\"INTEGER\",\n"
+ + " \"name\":\"VARCHAR(255)\",\n"
+ + " \"description\":\"VARCHAR(512)\",\n"
+ + " \"weight\":\"FLOAT\"\n"
+ + " },\n"
+ + " \"old\":null,\n"
+ + " \"pkNames\":[\n"
+ + " \"id\"\n"
+ + " ],\n"
+ + " \"sql\":\"\",\n"
+ + " \"sqlType\":{\n"
+ + " \"id\":4,\n"
+ + " \"name\":12,\n"
+ + " \"description\":12,\n"
+ + " \"weight\":7\n"
+ + " },\n"
+ + " \"table\":\"products2\",\n"
+ + " \"ts\":1589373515477,\n"
+ + " \"type\":\"INSERT\"\n"
+ + "}";
+
+ byte[] testBytes = testCanalData.getBytes(StandardCharsets.UTF_8);
+ FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("id", IntFormatInfo.INSTANCE),
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("description", StringFormatInfo.INSTANCE),
+ new FieldInfo("weight", FloatFormatInfo.INSTANCE)
+ };
+
+ DeserializationSchema<Row> schemaWithoutFilter = DeserializationSchemaFactory.build(
+ fieldInfos,
+ new CanalDeserializationInfo(null, null, false, "ISO_8601", true)
+ );
+ ListCollector<Row> collector1 = new ListCollector<>();
+ schemaWithoutFilter.deserialize(testBytes, collector1);
+ List<Row> innerList = collector1.getInnerList();
+ assertEquals(3, innerList.size());
+ Row row = innerList.get(0);
+ assertEquals(101, row.getField(0));
+ assertEquals("scooter", row.getField(1).toString());
+ assertEquals("Small 2-wheel scooter", row.getField(2).toString());
+ assertEquals(3.14f, (Float) row.getField(3), 0);
+ assertEquals("inventory", row.getField(4).toString());
+ assertEquals("products2", row.getField(5).toString());
+ assertEquals(4, ((Map<?, ?>) row.getField(6)).size());
+ assertEquals("id", ((String[]) row.getField(7))[0]);
+ // "es" and "ts" are treated as local in flink-canal
+ assertEquals(1589373515477L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(8)).getMillisecond());
+ assertEquals(1589373515000L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(9)).getMillisecond());
+
+ DeserializationSchema<Row> schemaWithFilter = DeserializationSchemaFactory.build(
+ fieldInfos,
+ new CanalDeserializationInfo("NoExistDB", null, false, "ISO_8601", true)
+ );
+ ListCollector<Row> collector2 = new ListCollector<>();
+ schemaWithFilter.deserialize(testBytes, collector2);
+ assertTrue(collector2.getInnerList().isEmpty());
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapperTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapperTest.java
new file mode 100644
index 0000000..0a5dbd4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CustomDateFormatDeserializationSchemaWrapperTest {
+
+ @Test
+ public void testFromStringToDateAndTime() throws IOException, ClassNotFoundException, ParseException {
+ FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", new DateFormatInfo()),
+ new FieldInfo("f2", new TimeFormatInfo()),
+ new FieldInfo("f3", new TimestampFormatInfo()),
+ new FieldInfo("f4", StringFormatInfo.INSTANCE)
+ };
+
+ DeserializationSchema<Row> stringSchema = DeserializationSchemaFactory.build(fieldInfos, null);
+ CustomDateFormatDeserializationSchemaWrapper schemaWrapper
+ = new CustomDateFormatDeserializationSchemaWrapper(stringSchema, extractFormatInfos(fieldInfos));
+
+ Row testRow = Row.of("2022-02-15", "15:52:30", "2022-02-15 15:52:30", "don't convert");
+ Row resultRow = schemaWrapper.fromStringToDateAndTime(testRow);
+ assertTrue(resultRow.getField(0) instanceof Date);
+ assertTrue(resultRow.getField(1) instanceof Time);
+ assertTrue(resultRow.getField(2) instanceof Timestamp);
+
+ final Row expectedRow = Row.of(Date.valueOf("2022-02-15"), Time.valueOf("15:52:30"),
+ Timestamp.valueOf("2022-02-15 15:52:30"), "don't convert");
+ assertEquals(expectedRow, resultRow);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
new file mode 100644
index 0000000..0f88049
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.types.Row;
+import org.apache.inlong.commons.msg.InLongMsg;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeserializationFunctionTest {
+
+ @Test
+ public void testProcessElement() throws Exception {
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String testData = "testData";
+ inLongMsg.addMsg("m=12&iname=tid", testData.getBytes(StandardCharsets.UTF_8));
+ SerializedRecord serializedRecord = new SerializedRecord(1, inLongMsg.buildArray());
+
+ DeserializationFunction function = new DeserializationFunction(
+ DeserializationSchemaFactory.build(
+ new FieldInfo[]{new FieldInfo("content", StringFormatInfo.INSTANCE)}, null
+ )
+ );
+
+ ListCollector<Row> collector = new ListCollector<>();
+ function.processElement(serializedRecord,null, collector);
+ Row row = collector.getInnerList().get(0);
+ assertEquals(1, row.getArity());
+ assertEquals(testData, row.getField(0));
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/JsonDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/JsonDeserializationTest.java
new file mode 100644
index 0000000..7e57422
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/JsonDeserializationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class JsonDeserializationTest {
+
+ private final FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("id", LongFormatInfo.INSTANCE),
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("bytes", BinaryFormatInfo.INSTANCE),
+ new FieldInfo("date", new DateFormatInfo("yyyy-MM-dd")),
+ new FieldInfo("time", new TimeFormatInfo("HH:mm:ss")),
+ new FieldInfo("timestamp", new TimestampFormatInfo("yyyy-MM-dd HH:mm:ss")),
+ new FieldInfo("map",
+ new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
+ new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
+ new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
+ };
+
+ private Row generateTestRow() {
+ Row testRow = new Row(8);
+ testRow.setField(0, 1238123899121L);
+ testRow.setField(1, "testName");
+
+ byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
+ testRow.setField(2, bytes);
+
+ testRow.setField(3, Date.valueOf("1990-10-14"));
+ testRow.setField(4, Time.valueOf("12:12:43"));
+ testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+
+ Map<String, Long> map = new HashMap<>();
+ map.put("flink", 123L);
+ testRow.setField(6, map);
+
+ Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+ Map<String, Integer> innerMap = new HashMap<>();
+ innerMap.put("key", 234);
+ nestedMap.put("inner_map", innerMap);
+ testRow.setField(7, nestedMap);
+
+ return testRow;
+ }
+
+ @Test
+ public void testJsonDeserializationSchema() throws IOException, ClassNotFoundException {
+ Row expectedRow = generateTestRow();
+ ObjectMapper objectMapper = new ObjectMapper();
+ ObjectNode root = objectMapper.createObjectNode();
+ //noinspection ConstantConditions
+ root.put("id", (long) expectedRow.getField(0));
+ root.put("name", (String) expectedRow.getField(1));
+ root.put("bytes", (byte[]) expectedRow.getField(2));
+ root.put("date", "1990-10-14");
+ root.put("time", "12:12:43");
+ root.put("timestamp", "1990-10-14 12:12:43");
+ root.putObject("map").put("flink", 123);
+ root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ DeserializationSchema<Row> schema =
+ DeserializationSchemaFactory.build(fieldInfos, new JsonDeserializationInfo());
+
+ ListCollector<Row> collector = new ListCollector<>();
+ schema.deserialize(serializedJson, collector);
+
+ assertEquals(expectedRow, collector.getInnerList().get(0));
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
index 9e21991..1b98346 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
@@ -26,7 +26,9 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Row;
+import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -52,6 +54,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -61,10 +64,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
import static org.junit.Assert.assertNull;
-public abstract class KafkaSinkTestBase<T> {
+public abstract class KafkaSinkTestBase {
private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class);
@@ -79,13 +83,13 @@ public abstract class KafkaSinkTestBase<T> {
private AdminClient kafkaAdmin;
private KafkaConsumer<String, Bytes> kafkaConsumer;
private Properties kafkaClientProperties;
- protected String brokerConnStr;
+ private String brokerConnStr;
// prepare data below in subclass
protected String topic;
protected List<Row> testRows;
protected FieldInfo[] fieldInfos;
- protected SerializationSchema<T> serializationSchema;
+ protected SerializationSchema<Row> serializationSchema;
@Before
public void setup() throws Exception {
@@ -160,7 +164,7 @@ public abstract class KafkaSinkTestBase<T> {
"The topic metadata failed to propagate to Kafka broker.");
}
- protected abstract void prepareData();
+ protected abstract void prepareData() throws IOException, ClassNotFoundException;
@After
public void clean() throws IOException {
@@ -198,7 +202,14 @@ public abstract class KafkaSinkTestBase<T> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
try {
- buildJob(env, testingSource);
+ env.addSource(testingSource).addSink(
+ buildKafkaSink(
+ new KafkaSinkInfo(new FieldInfo[]{}, brokerConnStr, topic, null),
+ new HashMap<>(),
+ serializationSchema,
+ new Configuration()
+ )
+ );
env.execute();
testFinishedCountDownLatch.await();
} catch (Exception e) {
@@ -211,8 +222,6 @@ public abstract class KafkaSinkTestBase<T> {
testFinishedCountDownLatch.countDown();
}
- protected abstract void buildJob(StreamExecutionEnvironment env, TestingSource testingSource);
-
private void verify() throws Exception {
kafkaConsumer.subscribe(Collections.singleton(topic));
List<Bytes> results = new ArrayList<>();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
deleted file mode 100644
index 9a612c7..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.kafka;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
-
-import java.util.HashMap;
-
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
-
-public abstract class KafkaSinkTestBaseForRow extends KafkaSinkTestBase<Row> {
-
- @Override
- protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
- env.addSource(testingSource).addSink(
- buildKafkaSink(
- new KafkaSinkInfo(new FieldInfo[]{}, brokerConnStr, topic, null),
- new HashMap<>(),
- serializationSchema,
- new Configuration()
- )
- );
- }
-
-}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
deleted file mode 100644
index bb17731..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.kafka;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
-
-import java.util.HashMap;
-
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
-
-public abstract class KafkaSinkTestBaseForRowData extends KafkaSinkTestBase<RowData> {
-
- @Override
- protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
- KafkaSinkInfo kafkaSinkInfo = new KafkaSinkInfo(fieldInfos, brokerConnStr, topic, null);
- DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
- env.addSource(testingSource).map(rowConverter::toInternal).returns(RowData.class).addSink(
- buildKafkaSink(kafkaSinkInfo, new HashMap<>(), serializationSchema, new Configuration())
- );
- }
-
-}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
index 82c7ee7..2a08417 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
@@ -34,7 +34,7 @@ import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
import org.apache.kafka.common.utils.Bytes;
import java.io.IOException;
@@ -47,10 +47,10 @@ import java.util.Map;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
import static org.junit.Assert.assertEquals;
-public class RowToAvroKafkaSinkTest extends KafkaSinkTestBaseForRow {
+public class RowToAvroKafkaSinkTest extends KafkaSinkTestBase {
@Override
- protected void prepareData() {
+ protected void prepareData() throws IOException, ClassNotFoundException {
fieldInfos = new FieldInfo[]{
new FieldInfo("f1", new StringFormatInfo()),
new FieldInfo("f2", new IntFormatInfo()),
@@ -68,7 +68,7 @@ public class RowToAvroKafkaSinkTest extends KafkaSinkTestBaseForRow {
))
};
topic = "test_kafka_row_to_avro";
- serializationSchema = RowSerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
+ serializationSchema = SerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
prepareTestRows();
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
index 8a56347..db7fb04 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
@@ -24,25 +24,26 @@ import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
import org.apache.kafka.common.utils.Bytes;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-public class RowToCanalKafkaSinkTest extends KafkaSinkTestBaseForRowData {
+public class RowToCanalKafkaSinkTest extends KafkaSinkTestBase {
@Override
- protected void prepareData() {
+ protected void prepareData() throws IOException, ClassNotFoundException {
topic = "test_kafka_row_to_canal";
fieldInfos = new FieldInfo[]{
new FieldInfo("f1", new StringFormatInfo()),
new FieldInfo("f2", new IntFormatInfo())
};
- serializationSchema = RowDataSerializationSchemaFactory.build(
+ serializationSchema = SerializationSchemaFactory.build(
fieldInfos, new CanalSerializationInfo("sql", "literal", "null", true)
);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
index 7239dc7..66c964b 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -26,9 +26,10 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
import org.apache.kafka.common.utils.Bytes;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
@@ -37,11 +38,11 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
-public class RowToJsonKafkaSinkTest extends KafkaSinkTestBaseForRow {
+public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
@Override
- protected void prepareData() {
+ protected void prepareData() throws IOException, ClassNotFoundException {
topic = "test_kafka_row_to_json";
- serializationSchema = RowSerializationSchemaFactory.build(
+ serializationSchema = SerializationSchemaFactory.build(
new FieldInfo[]{
new FieldInfo("f1", new StringFormatInfo()),
new FieldInfo("f2", new MapFormatInfo(new StringFormatInfo(), new DoubleFormatInfo())),
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
index dbab9b6..999f8b5 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
@@ -22,21 +22,22 @@ import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
import org.apache.kafka.common.utils.Bytes;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-public class RowToStringKafkaSinkTest extends KafkaSinkTestBaseForRow {
+public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
@Override
- protected void prepareData() {
+ protected void prepareData() throws IOException, ClassNotFoundException {
topic = "test_kafka_row_to_string";
- serializationSchema = RowSerializationSchemaFactory.build(
+ serializationSchema = SerializationSchemaFactory.build(
new FieldInfo[]{
new FieldInfo("f1", new StringFormatInfo()),
new FieldInfo("f2", new DoubleFormatInfo())
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapperTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapperTest.java
new file mode 100644
index 0000000..15dca5e
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapperTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CustomDateFormatSerializationSchemaWrapperTest {
+
+ @Test
+ public void testFromDateAndTimeToString() throws IOException, ClassNotFoundException {
+ FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", new DateFormatInfo()),
+ new FieldInfo("f2", new TimeFormatInfo()),
+ new FieldInfo("f3", new TimestampFormatInfo()),
+ new FieldInfo("f4", IntFormatInfo.INSTANCE)
+ };
+
+ SerializationSchema<Row> stringSchema = SerializationSchemaFactory.build(fieldInfos, null);
+
+ CustomDateFormatSerializationSchemaWrapper schemaWrapper
+ = new CustomDateFormatSerializationSchemaWrapper(stringSchema, extractFormatInfos(fieldInfos));
+
+ Row testRow = Row.of(Date.valueOf("2022-02-15"), Time.valueOf("15:52:30"),
+ Timestamp.valueOf("2022-02-15 15:52:30"), 1);
+
+ Row resultRow = schemaWrapper.fromDateAndTimeToString(testRow);
+ assertTrue(resultRow.getField(0) instanceof String);
+ assertTrue(resultRow.getField(1) instanceof String);
+ assertTrue(resultRow.getField(2) instanceof String);
+
+ Row expectedRow = Row.of("2022-02-15", "15:52:30", "2022-02-15 15:52:30", 1);
+ assertEquals(expectedRow, resultRow);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
index c2894e9..00d3b79 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
@@ -38,8 +38,10 @@ import java.io.IOException;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.deepCopy;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class CommonUtilsTest {
@@ -296,4 +298,19 @@ public class CommonUtilsTest {
assertEquals(expectedJsonNode, actualJsonNode);
}
+
+ @Test
+ public void testDeepCopy() throws IOException, ClassNotFoundException {
+ FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", StringFormatInfo.INSTANCE),
+ new FieldInfo("f2", IntFormatInfo.INSTANCE)
+ };
+
+ FieldInfo[] copiedInfos = (FieldInfo[]) deepCopy(fieldInfos);
+ assertArrayEquals(fieldInfos, copiedInfos);
+
+ fieldInfos[0].setFormatInfo(IntFormatInfo.INSTANCE);
+ assertTrue(copiedInfos[0].getFormatInfo() instanceof StringFormatInfo);
+ }
+
}