You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/16 11:10:33 UTC
[incubator-inlong] branch master updated: [INLONG-2346][Feature][InLong-Sort] Support avro and canal formats for sort sink (#2387)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 317ee86 [INLONG-2346][Feature][InLong-Sort] Support avro and canal formats for sort sink (#2387)
317ee86 is described below
commit 317ee8630afe3d1d86c96d0b7e063af44a12fdd5
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Wed Feb 16 19:10:28 2022 +0800
[INLONG-2346][Feature][InLong-Sort] Support avro and canal formats for sort sink (#2387)
---
inlong-sort/pom.xml | 54 ++---
.../inlong/sort/configuration/Constants.java | 2 +
.../serialization/AvroSerializationInfo.java | 8 +
.../serialization/CanalSerializationInfo.java | 75 ++++++
.../inlong/sort/protocol/sink/KafkaSinkInfo.java | 6 +-
.../serialization/AvroSerializationInfoTest.java | 36 +++
.../serialization/CanalSerializationInfoTest.java | 38 ++++
inlong-sort/sort-dist/pom.xml | 39 ++++
.../inlong/sort/formats/base/TableFormatUtils.java | 22 +-
.../sort/formats/common/BinaryFormatInfo.java | 63 +++++
.../inlong/sort/formats/common/BinaryTypeInfo.java | 46 ++++
.../inlong/sort/formats/common/FormatInfo.java | 4 +-
.../inlong/sort/formats/common/NullFormatInfo.java | 51 +++++
.../inlong/sort/formats/common/NullTypeInfo.java | 46 ++++
.../sort/formats/common/TimestampFormatInfo.java | 4 +-
.../inlong/sort/formats/common/TypeInfo.java | 4 +-
.../sort/formats/common/BinaryFormatInfoTest.java | 53 +++++
.../formats/common/TimestampFormatInfoTest.java | 1 +
.../sort-formats/format-inlongmsg-base/pom.xml | 1 +
inlong-sort/sort-single-tenant/pom.xml | 68 ++++++
.../inlong/sort/singletenant/flink/Entrance.java | 8 +-
.../singletenant/flink/kafka/KafkaSinkBuilder.java | 50 +++-
.../RowDataSerializationSchemaFactory.java | 92 ++++++++
.../RowSerializationSchemaFactory.java | 21 +-
.../sort/singletenant/flink/utils/CommonUtils.java | 47 ++++
.../flink/kafka/KafkaSinkTestBase.java | 42 ++--
.../flink/kafka/KafkaSinkTestBaseForRow.java | 45 ++++
.../flink/kafka/KafkaSinkTestBaseForRowData.java | 43 ++++
.../flink/kafka/RowToAvroKafkaSinkTest.java | 127 +++++++++++
.../flink/kafka/RowToCanalKafkaSinkTest.java | 87 +++++++
.../flink/kafka/RowToJsonKafkaSinkTest.java | 36 +--
.../flink/kafka/RowToStringKafkaSinkTest.java | 34 +--
.../singletenant/flink/utils/CommonUtilsTest.java | 253 +++++++++++++++++++++
33 files changed, 1386 insertions(+), 120 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 14cbd4f..d242dd2 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -168,6 +168,12 @@
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
@@ -185,6 +191,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
@@ -372,6 +384,12 @@
<artifactId>kafka_${flink.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -420,45 +438,13 @@
</argLine>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.4</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <finalName>${project.artifactId}-${project.version}</finalName>
- <filters>
- <!-- Globally exclude log4j.properties from our JAR files. -->
- <filter>
- <artifact>*</artifact>
- <excludes>
- <exclude>log4j.properties</exclude>
- <exclude>log4j-test.properties</exclude>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index c6e8067..a0ad42b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -57,6 +57,8 @@ public class Constants {
public static final String DESERIALIZATION_SCHEMA_UID = "deserialization_schema_uid";
+ public static final String CONVERTER_UID = "converter_uid";
+
public static final String SINK_UID = "sink_uid";
/**
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java
index c9a82cf..56c6d72 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java
@@ -21,4 +21,12 @@ public class AvroSerializationInfo implements SerializationInfo {
private static final long serialVersionUID = 8446721117598285868L;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ return o != null && getClass() == o.getClass();
+ }
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java
index 633b10b..5849114 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java
@@ -17,8 +17,83 @@
package org.apache.inlong.sort.protocol.serialization;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class CanalSerializationInfo implements SerializationInfo {
private static final long serialVersionUID = 479443152335788151L;
+ @JsonProperty("timestamp_format_standard")
+ private final String timestampFormatStandard;
+
+ @JsonProperty("map_null_key_mod")
+ private final String mapNullKeyMod;
+
+ @JsonProperty("map_null_key_literal")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final String mapNullKeyLiteral;
+
+ @JsonProperty("encode_decimal_as_plain_number")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final boolean encodeDecimalAsPlainNumber;
+
+ @JsonCreator
+ public CanalSerializationInfo(
+ @JsonProperty("timestamp_format_standard") String timestampFormatStandard,
+ @JsonProperty("map_null_key_mod") String mapNullKeyMod,
+ @JsonProperty("map_null_key_literal") String mapNullKeyLiteral,
+ @JsonProperty("encode_decimal_as_plain_number") boolean encodeDecimalAsPlainNumber
+ ) {
+ this.timestampFormatStandard = checkNotNull(timestampFormatStandard).toUpperCase();
+ this.mapNullKeyMod = checkNotNull(mapNullKeyMod).toUpperCase();
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ }
+
+ @JsonProperty("timestamp_format_standard")
+ public String getTimestampFormatStandard() {
+ return timestampFormatStandard;
+ }
+
+ @JsonProperty("map_null_key_mod")
+ public String getMapNullKeyMod() {
+ return mapNullKeyMod;
+ }
+
+ @JsonProperty("map_null_key_literal")
+ public String getMapNullKeyLiteral() {
+ return mapNullKeyLiteral;
+ }
+
+ @JsonProperty("encode_decimal_as_plain_number")
+ public boolean isEncodeDecimalAsPlainNumber() {
+ return encodeDecimalAsPlainNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CanalSerializationInfo that = (CanalSerializationInfo) o;
+ return encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber
+ && Objects.equals(timestampFormatStandard, that.timestampFormatStandard)
+ && Objects.equals(mapNullKeyMod, that.mapNullKeyMod)
+ && Objects.equals(mapNullKeyLiteral, that.mapNullKeyLiteral);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestampFormatStandard, mapNullKeyMod, mapNullKeyLiteral, encodeDecimalAsPlainNumber);
+ }
+
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java
index b4922f5..90f55c1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java
@@ -24,6 +24,8 @@ import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import java.util.Objects;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class KafkaSinkInfo extends SinkInfo {
private static final long serialVersionUID = 161617117094475954L;
@@ -45,8 +47,8 @@ public class KafkaSinkInfo extends SinkInfo {
@JsonProperty("serialization_info") SerializationInfo serializationInfo
) {
super(fields);
- this.address = address;
- this.topic = topic;
+ this.address = checkNotNull(address);
+ this.topic = checkNotNull(topic);
this.serializationInfo = serializationInfo;
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfoTest.java
new file mode 100644
index 0000000..0a1e175
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfoTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.serialization;
+
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
+
+public class AvroSerializationInfoTest extends ProtocolBaseTest {
+
+ @Override
+ public void init() {
+ expectedObject = new AvroSerializationInfo();
+
+ expectedJson = "{\"type\":\"avro\"}";
+
+ equalObj1 = expectedObject;
+ equalObj2 = new AvroSerializationInfo();
+ unequalObj = null;
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfoTest.java
new file mode 100644
index 0000000..b0ad5fb
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfoTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.serialization;
+
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
+
+public class CanalSerializationInfoTest extends ProtocolBaseTest {
+
+ @Override
+ public void init() {
+ expectedObject = new CanalSerializationInfo("Sql", "Literal", null, false);
+ expectedJson = "{\n"
+ + " \"type\" : \"canal\",\n"
+ + " \"timestamp_format_standard\" : \"Sql\",\n"
+ + " \"map_null_key_mod\" : \"Literal\"\n"
+ + "}";
+ equalObj1 = expectedObject;
+ equalObj2 = new CanalSerializationInfo("Sql", "Literal", null, false);
+ unequalObj = new CanalSerializationInfo("Sql", "Literal", null, true);
+ }
+
+}
diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml
index b5538da..a4646f1 100644
--- a/inlong-sort/sort-dist/pom.xml
+++ b/inlong-sort/sort-dist/pom.xml
@@ -48,4 +48,43 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.4</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <filters>
+ <!-- Globally exclude log4j.properties from our JAR files. -->
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>log4j.properties</exclude>
+ <exclude>log4j-test.properties</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index d3abb01..40de211 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
@@ -47,6 +48,7 @@ import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimeType;
@@ -57,6 +59,8 @@ import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
import org.apache.inlong.sort.formats.common.ArrayTypeInfo;
import org.apache.inlong.sort.formats.common.BasicFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryTypeInfo;
import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
import org.apache.inlong.sort.formats.common.BooleanTypeInfo;
import org.apache.inlong.sort.formats.common.ByteFormatInfo;
@@ -77,6 +81,7 @@ import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.LongTypeInfo;
import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.MapTypeInfo;
+import org.apache.inlong.sort.formats.common.NullFormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.common.RowTypeInfo;
import org.apache.inlong.sort.formats.common.ShortFormatInfo;
@@ -94,6 +99,9 @@ import org.apache.inlong.sort.formats.common.TypeInfo;
*/
public class TableFormatUtils {
+ // to support avro format, precision must be less than 3
+ private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
+
/**
* Returns the {@link DeserializationSchema} described by the given
* properties.
@@ -299,6 +307,10 @@ public class TableFormatUtils {
}
return new RowFormatInfo(fieldNames, fieldFormatInfos);
+ } else if (logicalType instanceof BinaryType) {
+ return BinaryFormatInfo.INSTANCE;
+ } else if (logicalType instanceof NullType) {
+ return NullFormatInfo.INSTANCE;
} else {
throw new UnsupportedOperationException();
}
@@ -331,7 +343,7 @@ public class TableFormatUtils {
} else if (formatInfo instanceof DateFormatInfo) {
return new DateType();
} else if (formatInfo instanceof TimestampFormatInfo) {
- return new TimestampType();
+ return new TimestampType(DEFAULT_PRECISION_FOR_TIMESTAMP);
} else if (formatInfo instanceof ArrayFormatInfo) {
FormatInfo elementFormatInfo = ((ArrayFormatInfo) formatInfo).getElementFormatInfo();
return new ArrayType(deriveLogicalType(elementFormatInfo));
@@ -350,6 +362,10 @@ public class TableFormatUtils {
logicalTypes[i] = deriveLogicalType(formatInfos[i]);
}
return RowType.of(logicalTypes, rowFormatInfo.getFieldNames());
+ } else if (formatInfo instanceof BinaryFormatInfo) {
+ return new BinaryType();
+ } else if (formatInfo instanceof NullFormatInfo) {
+ return new NullType();
} else {
throw new UnsupportedOperationException();
}
@@ -386,6 +402,8 @@ public class TableFormatUtils {
return Types.SQL_TIME;
} else if (typeInfo instanceof TimestampTypeInfo) {
return Types.SQL_TIMESTAMP;
+ } else if (typeInfo instanceof BinaryTypeInfo) {
+ return Types.PRIMITIVE_ARRAY(Types.BYTE);
} else if (typeInfo instanceof ArrayTypeInfo) {
ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) typeInfo;
TypeInfo elementTypeInfo =
@@ -414,7 +432,7 @@ public class TableFormatUtils {
return Types.ROW_NAMED(fieldNames, fieldTypes);
} else {
- throw new IllegalStateException("Unexpected format.");
+ throw new IllegalStateException("Unexpected type info " + typeInfo + ".");
}
}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
new file mode 100644
index 0000000..b8e5774
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import java.nio.charset.StandardCharsets;
+
+public class BinaryFormatInfo implements BasicFormatInfo<byte[]> {
+
+ private static final long serialVersionUID = 8379022656220694709L;
+
+ public static final BinaryFormatInfo INSTANCE = new BinaryFormatInfo();
+
+ @Override
+ public String serialize(byte[] record) throws Exception {
+ return new String(record, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] deserialize(String text) throws Exception {
+ return text.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public TypeInfo getTypeInfo() {
+ return BinaryTypeInfo.INSTANCE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "BinaryFormatInfo";
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryTypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryTypeInfo.java
new file mode 100644
index 0000000..a1ab577
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryTypeInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+public class BinaryTypeInfo implements TypeInfo {
+
+ private static final long serialVersionUID = -5950082561854221079L;
+
+ public static final BinaryTypeInfo INSTANCE = new BinaryTypeInfo();
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "BinaryTypeInfo";
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index a443e54..79b8411 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -42,7 +42,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes.Type(name = "timestamp", value = TimestampFormatInfo.class),
@JsonSubTypes.Type(name = "array", value = ArrayFormatInfo.class),
@JsonSubTypes.Type(name = "map", value = MapFormatInfo.class),
- @JsonSubTypes.Type(name = "row", value = RowFormatInfo.class)
+ @JsonSubTypes.Type(name = "row", value = RowFormatInfo.class),
+ @JsonSubTypes.Type(name = "binary", value = BinaryFormatInfo.class),
+ @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class)
})
public interface FormatInfo extends Serializable {
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullFormatInfo.java
new file mode 100644
index 0000000..baf148a
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullFormatInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+public class NullFormatInfo implements FormatInfo {
+
+ private static final long serialVersionUID = 4672953842346988594L;
+
+ public static final NullFormatInfo INSTANCE = new NullFormatInfo();
+
+ @Override
+ public TypeInfo getTypeInfo() {
+ return NullTypeInfo.INSTANCE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "NullFormatInfo";
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullTypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullTypeInfo.java
new file mode 100644
index 0000000..5c5e59c
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullTypeInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+public class NullTypeInfo implements TypeInfo {
+
+ private static final long serialVersionUID = -8808899337434251627L;
+
+ public static final NullTypeInfo INSTANCE = new NullTypeInfo();
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "NullTypeInfo";
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
index 69b0882..ac7c307 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
@@ -54,8 +54,8 @@ public class TimestampFormatInfo implements BasicFormatInfo<Timestamp> {
this.format = format;
if (!format.equals("MICROS")
- && !format.equals("MILLIS")
- && !format.equals("SECONDS")) {
+ && !format.equals("MILLIS")
+ && !format.equals("SECONDS")) {
this.simpleDateFormat = new SimpleDateFormat(format);
} else {
this.simpleDateFormat = null;
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
index fdadbcf..626f620 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
@@ -41,7 +41,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes.Type(name = "timestamp", value = TimestampTypeInfo.class),
@JsonSubTypes.Type(name = "array", value = ArrayTypeInfo.class),
@JsonSubTypes.Type(name = "map", value = MapTypeInfo.class),
- @JsonSubTypes.Type(name = "row", value = RowTypeInfo.class)
+ @JsonSubTypes.Type(name = "row", value = RowTypeInfo.class),
+ @JsonSubTypes.Type(name = "binary", value = BinaryTypeInfo.class),
+ @JsonSubTypes.Type(name = "null", value = NullTypeInfo.class)
})
public interface TypeInfo extends Serializable {
diff --git a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/BinaryFormatInfoTest.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/BinaryFormatInfoTest.java
new file mode 100644
index 0000000..338ad71
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/BinaryFormatInfoTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class BinaryFormatInfoTest extends FormatInfoTestBase {
+
+ @Override
+ Collection<FormatInfo> createFormatInfos() {
+ return Collections.singletonList(BinaryFormatInfo.INSTANCE);
+ }
+
+ @Test
+ public void testSerialize() throws Exception {
+ assertEquals(
+ "testString",
+ BinaryFormatInfo.INSTANCE.serialize("testString".getBytes(StandardCharsets.UTF_8))
+ );
+ }
+
+ @Test
+ public void testDeserialize() throws Exception {
+ assertArrayEquals(
+ "testString".getBytes(StandardCharsets.UTF_8),
+ BinaryFormatInfo.INSTANCE.deserialize("testString")
+ );
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java
index 8e764b9..97e002a 100644
--- a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
+
import org.junit.Test;
/**
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index e6f0d95..3a97cad 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -106,6 +106,7 @@
<goal>shade</goal>
</goals>
<configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index c2d8115..cb8e925 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -45,6 +45,12 @@
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connectors</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -74,11 +80,23 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.12.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -91,6 +109,17 @@
<artifactId>flink-json</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test -->
<dependency>
<groupId>org.apache.kafka</groupId>
@@ -125,4 +154,43 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.4</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <filters>
+ <!-- Globally exclude log4j.properties from our JAR files. -->
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>log4j.properties</exclude>
+ <exclude>log4j-test.properties</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index cfbd134..330cf72 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.singletenant.flink;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSinkStream;
import com.google.common.base.Preconditions;
import java.io.File;
@@ -159,11 +159,7 @@ public class Entrance {
.build();
break;
case Constants.SINK_TYPE_KAFKA:
- sourceStream
- .addSink(buildKafkaSink((KafkaSinkInfo) sinkInfo, properties, config))
- .uid(Constants.SINK_UID)
- .name("Kafka Sink")
- .setParallelism(sinkParallelism);
+ buildKafkaSinkStream(sourceStream, (KafkaSinkInfo) sinkInfo, properties, config, sinkParallelism);
break;
default:
throw new IllegalArgumentException("Unsupported sink type " + sinkType);
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
index 01faf32..798a69f 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
@@ -18,31 +18,38 @@
package org.apache.inlong.sort.singletenant.flink.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Map;
import java.util.Properties;
import static org.apache.inlong.sort.configuration.Constants.SINK_KAFKA_PRODUCER_POOL_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
public class KafkaSinkBuilder {
- public static SinkFunction<Row> buildKafkaSink(
+ public static <T> SinkFunction<T> buildKafkaSink(
KafkaSinkInfo kafkaSinkInfo,
Map<String, Object> properties,
+ SerializationSchema<T> serializationSchema,
Configuration config
) {
String topic = kafkaSinkInfo.getTopic();
Properties producerProperties = buildProducerProperties(properties, kafkaSinkInfo.getAddress());
- SerializationSchema<Row> serializationSchema =
- RowSerializationSchemaFactory.build(kafkaSinkInfo, kafkaSinkInfo.getSerializationInfo());
return new FlinkKafkaProducer<>(
topic,
@@ -60,4 +67,39 @@ public class KafkaSinkBuilder {
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
return producerProperties;
}
+
+ public static void buildKafkaSinkStream(
+ DataStream<Row> sourceStream,
+ KafkaSinkInfo kafkaSinkInfo,
+ Map<String, Object> properties,
+ Configuration config,
+ int sinkParallelism
+ ) {
+ SerializationInfo serializationInfo = kafkaSinkInfo.getSerializationInfo();
+ if (serializationInfo instanceof CanalSerializationInfo) {
+ DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
+ DataStream<RowData> dataStream = sourceStream
+ .map(rowConverter::toInternal)
+ .uid(Constants.CONVERTER_UID)
+ .name("Row to RowData Converter")
+ .setParallelism(sinkParallelism);
+
+ SerializationSchema<RowData> schema = RowDataSerializationSchemaFactory.build(
+ kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
+
+ dataStream
+ .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
+ .uid(Constants.SINK_UID)
+ .name("Kafka Sink")
+ .setParallelism(sinkParallelism);
+ } else {
+ SerializationSchema<Row> schema = RowSerializationSchemaFactory.build(
+ kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
+ sourceStream
+ .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
+ .uid(Constants.SINK_UID)
+ .name("Kafka Sink")
+ .setParallelism(sinkParallelism);
+ }
+ }
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
new file mode 100644
index 0000000..69d7378
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.canal.CanalJsonSerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
+
+public class RowDataSerializationSchemaFactory {
+
+ private static final String CANAL_TIMESTAMP_STANDARD_SQL = "SQL";
+ private static final String CANAL_TIMESTAMP_STANDARD_ISO = "ISO_8601";
+ private static final String CANAL_MAP_NULL_KEY_MODE_FAIL = "FAIL";
+ private static final String CANAL_MAP_NULL_KEY_MODE_DROP = "DROP";
+ private static final String CANAL_MAP_NULL_KEY_MODE_LITERAL = "LITERAL";
+
+ private static final String CANAL_MAP_NULL_KEY_LITERAL_DEFAULT = "null";
+
+ public static SerializationSchema<RowData> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
+ if (serializationInfo instanceof CanalSerializationInfo) {
+ return buildCanalRowDataSerializationSchema(fieldInfos, (CanalSerializationInfo) serializationInfo);
+ }
+
+ throw new IllegalArgumentException("Unsupported RowData serialization info: " + serializationInfo);
+ }
+
+ private static SerializationSchema<RowData> buildCanalRowDataSerializationSchema(
+ FieldInfo[] fieldInfos,
+ CanalSerializationInfo canalSerializationInfo
+ ) {
+ String mapNullKeyLiteral = canalSerializationInfo.getMapNullKeyLiteral();
+ if (StringUtils.isEmpty(mapNullKeyLiteral)) {
+ mapNullKeyLiteral = CANAL_MAP_NULL_KEY_LITERAL_DEFAULT;
+ }
+
+ RowType rowType = convertFieldInfosToRowType(fieldInfos);
+ return new CanalJsonSerializationSchema(
+ rowType,
+ getTimestampFormatStandard(canalSerializationInfo.getTimestampFormatStandard()),
+ getMapNullKeyMode(canalSerializationInfo.getMapNullKeyMod()),
+ mapNullKeyLiteral,
+ canalSerializationInfo.isEncodeDecimalAsPlainNumber()
+ );
+ }
+
+ private static TimestampFormat getTimestampFormatStandard(String input) {
+ if (CANAL_TIMESTAMP_STANDARD_SQL.equals(input)) {
+ return TimestampFormat.SQL;
+ } else if (CANAL_TIMESTAMP_STANDARD_ISO.equals(input)) {
+ return TimestampFormat.ISO_8601;
+ }
+
+ throw new IllegalArgumentException("Unsupported timestamp format standard: " + input);
+ }
+
+ private static JsonOptions.MapNullKeyMode getMapNullKeyMode(String input) {
+ if (CANAL_MAP_NULL_KEY_MODE_FAIL.equals(input)) {
+ return JsonOptions.MapNullKeyMode.FAIL;
+ } else if (CANAL_MAP_NULL_KEY_MODE_DROP.equals(input)) {
+ return JsonOptions.MapNullKeyMode.DROP;
+ } else if (CANAL_MAP_NULL_KEY_MODE_LITERAL.equals(input)) {
+ return JsonOptions.MapNullKeyMode.LITERAL;
+ }
+
+ throw new IllegalArgumentException("Unsupported map null key mode: " + input);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
index a03f037..b2eb961 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
@@ -19,22 +19,26 @@ package org.apache.inlong.sort.singletenant.flink.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
import java.nio.charset.StandardCharsets;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
public class RowSerializationSchemaFactory {
- public static SerializationSchema<Row> build(SinkInfo sinkInfo, SerializationInfo serializationInfo) {
+ public static SerializationSchema<Row> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
if (serializationInfo instanceof JsonSerializationInfo) {
- return buildJsonRowSerializationSchema(sinkInfo.getFields());
+ return buildJsonRowSerializationSchema(fieldInfos);
+ } else if (serializationInfo instanceof AvroSerializationInfo) {
+ return buildAvroRowSerializationSchema(fieldInfos);
} else {
return buildStringRowSerializationSchema();
}
@@ -46,14 +50,9 @@ public class RowSerializationSchemaFactory {
return builder.withTypeInfo(rowTypeInfo).build();
}
- private static SerializationSchema<Row> buildCanalRowSerializationSchema(SerializationInfo serializationInfo) {
- // TODO
- return null;
- }
-
- private static SerializationSchema<Row> buildAvroRowSerializationSchema(SerializationInfo serializationInfo) {
- // TODO
- return null;
+ private static SerializationSchema<Row> buildAvroRowSerializationSchema(FieldInfo[] fieldInfos) {
+ String avroSchemaInJson = buildAvroRecordSchemaInJson(fieldInfos);
+ return new AvroRowSerializationSchema(avroSchemaInJson);
}
private static SerializationSchema<Row> buildStringRowSerializationSchema() {
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index b3be3ef..0ee5b7a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -18,14 +18,24 @@
package org.apache.inlong.sort.singletenant.flink.utils;
+import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableSchema.Builder;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.common.TypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema;
+import static org.apache.inlong.sort.formats.base.TableFormatUtils.deriveLogicalType;
+
public class CommonUtils {
public static TableSchema getTableSchema(SinkInfo sinkInfo) {
@@ -57,4 +67,41 @@ public class CommonUtils {
return new org.apache.flink.api.java.typeutils.RowTypeInfo(typeInformationArray, fieldNames);
}
+
+ public static String buildAvroRecordSchemaInJson(FieldInfo[] fieldInfos) {
+ int fieldLength = fieldInfos.length;
+ String[] fieldNames = new String[fieldLength];
+ FormatInfo[] fieldFormatInfos = new FormatInfo[fieldLength];
+ for (int i = 0; i < fieldLength; i++) {
+ fieldNames[i] = fieldInfos[i].getName();
+ fieldFormatInfos[i] = fieldInfos[i].getFormatInfo();
+ }
+
+ RowFormatInfo rowFormatInfo = new RowFormatInfo(fieldNames, fieldFormatInfos);
+ LogicalType logicalType = deriveLogicalType(rowFormatInfo);
+ Schema schema = convertToSchema(logicalType);
+
+ if (schema.isUnion()) {
+ return schema.getTypes().get(1).toString();
+ }
+ return schema.toString();
+ }
+
+ public static DataFormatConverters.RowConverter createRowConverter(SinkInfo sinkInfo) {
+ DataType[] fieldDataTypes = getTableSchema(sinkInfo).getFieldDataTypes();
+ return new DataFormatConverters.RowConverter(fieldDataTypes);
+ }
+
+ public static RowType convertFieldInfosToRowType(FieldInfo[] fieldInfos) {
+ int fieldLength = fieldInfos.length;
+ String[] fieldNames = new String[fieldLength];
+ LogicalType[] fieldLogicalTypes = new LogicalType[fieldLength];
+ for (int i = 0; i < fieldLength; i++) {
+ fieldNames[i] = fieldInfos[i].getName();
+ fieldLogicalTypes[i] = TableFormatUtils.deriveLogicalType(fieldInfos[i].getFormatInfo());
+ }
+
+ return RowType.of(fieldLogicalTypes, fieldNames);
+ }
+
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
index e4ffcc8..9e21991 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
@@ -21,14 +21,12 @@ package org.apache.inlong.sort.singletenant.flink.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Row;
-import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -36,7 +34,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Before;
@@ -52,7 +52,6 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -62,11 +61,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
import static org.junit.Assert.assertNull;
-public abstract class KafkaSinkTestBase {
+public abstract class KafkaSinkTestBase<T> {
private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class);
@@ -78,16 +76,16 @@ public abstract class KafkaSinkTestBase {
private TestingServer zkServer;
private KafkaServer kafkaServer;
- private String brokerConnStr;
private AdminClient kafkaAdmin;
- private KafkaConsumer<String, String> kafkaConsumer;
+ private KafkaConsumer<String, Bytes> kafkaConsumer;
private Properties kafkaClientProperties;
+ protected String brokerConnStr;
// prepare data below in subclass
protected String topic;
protected List<Row> testRows;
protected FieldInfo[] fieldInfos;
- protected SerializationInfo serializationInfo;
+ protected SerializationSchema<T> serializationSchema;
@Before
public void setup() throws Exception {
@@ -150,7 +148,7 @@ public abstract class KafkaSinkTestBase {
kafkaClientProperties.setProperty("auto.offset.reset", "earliest");
kafkaClientProperties.setProperty("max.poll.records", "1000");
kafkaClientProperties.setProperty("key.deserializer", StringDeserializer.class.getName());
- kafkaClientProperties.setProperty("value.deserializer", StringDeserializer.class.getName());
+ kafkaClientProperties.setProperty("value.deserializer", BytesDeserializer.class.getName());
}
private void addTopic() throws InterruptedException, TimeoutException, ExecutionException {
@@ -192,25 +190,19 @@ public abstract class KafkaSinkTestBase {
}
@Test(timeout = 3 * 60 * 1000)
- public void testKafkaSink() throws InterruptedException {
+ public void testKafkaSink() throws Exception {
TestingSource testingSource = createTestingSource();
final ExecutorService executorService = Executors.newSingleThreadExecutor();
CountDownLatch testFinishedCountDownLatch = new CountDownLatch(1);
executorService.execute(() -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.addSource(testingSource).addSink(
- buildKafkaSink(
- new KafkaSinkInfo(fieldInfos, brokerConnStr, topic, serializationInfo),
- new HashMap<>(),
- new Configuration()
- )
- );
try {
+ buildJob(env, testingSource);
env.execute();
testFinishedCountDownLatch.await();
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("Error occurred when executing flink test job: ", e);
}
});
@@ -219,13 +211,15 @@ public abstract class KafkaSinkTestBase {
testFinishedCountDownLatch.countDown();
}
- private void verify() throws InterruptedException {
+ protected abstract void buildJob(StreamExecutionEnvironment env, TestingSource testingSource);
+
+ private void verify() throws Exception {
kafkaConsumer.subscribe(Collections.singleton(topic));
- List<String> results = new ArrayList<>();
+ List<Bytes> results = new ArrayList<>();
while (true) {
- ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
+ ConsumerRecords<String, Bytes> records = kafkaConsumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
- for (ConsumerRecord<String, String> record : records) {
+ for (ConsumerRecord<String, Bytes> record : records) {
assertNull(record.key());
results.add(record.value());
}
@@ -244,7 +238,7 @@ public abstract class KafkaSinkTestBase {
}
}
- protected abstract void verifyData(List<String> results);
+ protected abstract void verifyData(List<Bytes> results) throws IOException;
private TestingSource createTestingSource() {
TestingSource testingSource = new TestingSource();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
new file mode 100644
index 0000000..9a612c7
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+
+import java.util.HashMap;
+
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+
+public abstract class KafkaSinkTestBaseForRow extends KafkaSinkTestBase<Row> {
+
+ @Override
+ protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
+ env.addSource(testingSource).addSink(
+ buildKafkaSink(
+ new KafkaSinkInfo(new FieldInfo[]{}, brokerConnStr, topic, null),
+ new HashMap<>(),
+ serializationSchema,
+ new Configuration()
+ )
+ );
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
new file mode 100644
index 0000000..bb17731
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+
+import java.util.HashMap;
+
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
+
+public abstract class KafkaSinkTestBaseForRowData extends KafkaSinkTestBase<RowData> {
+
+ @Override
+ protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
+ KafkaSinkInfo kafkaSinkInfo = new KafkaSinkInfo(fieldInfos, brokerConnStr, topic, null);
+ DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
+ env.addSource(testingSource).map(rowConverter::toInternal).returns(RowData.class).addSink(
+ buildKafkaSink(kafkaSinkInfo, new HashMap<>(), serializationSchema, new Configuration())
+ );
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
new file mode 100644
index 0000000..82c7ee7
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.NullFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
+import static org.junit.Assert.assertEquals;
+
+public class RowToAvroKafkaSinkTest extends KafkaSinkTestBaseForRow {
+
+ @Override
+ protected void prepareData() {
+ fieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", new StringFormatInfo()),
+ new FieldInfo("f2", new IntFormatInfo()),
+ new FieldInfo("f3", new NullFormatInfo()),
+ new FieldInfo("f4", new BinaryFormatInfo()),
+ new FieldInfo("f5", new MapFormatInfo(
+ new StringFormatInfo(),
+ new RowFormatInfo(
+ new String[]{"f51", "f52"},
+ new FormatInfo[]{
+ new IntFormatInfo(),
+ new ArrayFormatInfo(new DoubleFormatInfo())
+ }
+ )
+ ))
+ };
+ topic = "test_kafka_row_to_avro";
+ serializationSchema = RowSerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
+
+ prepareTestRows();
+ }
+
+ private void prepareTestRows() {
+ testRows = new ArrayList<>();
+ Map<String, Row> map1 = new HashMap<>();
+ Double[] doubles1 = new Double[]{1.0, 2.0, 3.0};
+ map1.put("AnnaMap", Row.of(1, doubles1));
+ testRows.add(Row.of("Anna", 100, null, new byte[]{1, 2, 3}, map1));
+
+ Map<String, Row> map2 = new HashMap<>();
+ Double[] doubles2 = new Double[]{4.0, 5.0, 6.0};
+ map2.put("LisaMap", Row.of(2, doubles2));
+ testRows.add(Row.of("Lisa", 50, null, new byte[]{4, 5, 6}, map2));
+
+ Map<String, Row> map3 = new HashMap<>();
+ Double[] doubles3 = new Double[]{7.0, 8.0, 9.0};
+ map3.put("BobMap", Row.of(3, doubles3));
+ testRows.add(Row.of("Bob", 10, null, new byte[]{7, 8, 9}, map3));
+ }
+
+ @Override
+ protected void verifyData(List<Bytes> results) throws IOException {
+ AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(
+ new Schema.Parser().parse(buildAvroRecordSchemaInJson(fieldInfos)));
+
+ List<String> actualData = new ArrayList<>(testRows.size());
+ for (Bytes result : results) {
+ GenericRecord genericRecord = deserializationSchema.deserialize(result.get());
+ Row tempRow = new Row(5);
+ tempRow.setField(0, genericRecord.get("f1"));
+ tempRow.setField(1, genericRecord.get("f2"));
+ tempRow.setField(2, genericRecord.get("f3"));
+ ByteBuffer f3 = (ByteBuffer) genericRecord.get("f4");
+ tempRow.setField(3, f3.array());
+ Map<Utf8, GenericRecord> f5 = (Map<Utf8, GenericRecord>) genericRecord.get("f5");
+ Map<String, Row> tempMap = new HashMap<>();
+ for (Map.Entry<Utf8, GenericRecord> utf8GenericRecordEntry : f5.entrySet()) {
+ String key = new String(utf8GenericRecordEntry.getKey().getBytes());
+ GenericRecord value = utf8GenericRecordEntry.getValue();
+ tempMap.put(key, Row.of(value.get("f51"), value.get("f52")));
+ }
+ tempRow.setField(4, tempMap);
+ actualData.add(tempRow.toString());
+ }
+ actualData.sort(String::compareTo);
+
+ List<String> expectedData = new ArrayList<>(testRows.size());
+ testRows.forEach(row -> expectedData.add(row.toString()));
+ expectedData.sort(String::compareTo);
+
+ assertEquals(expectedData, actualData);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
new file mode 100644
index 0000000..8a56347
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowToCanalKafkaSinkTest extends KafkaSinkTestBaseForRowData {
+
+ @Override
+ protected void prepareData() {
+ topic = "test_kafka_row_to_canal";
+ fieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", new StringFormatInfo()),
+ new FieldInfo("f2", new IntFormatInfo())
+ };
+
+ serializationSchema = RowDataSerializationSchemaFactory.build(
+ fieldInfos, new CanalSerializationInfo("sql", "literal", "null", true)
+ );
+
+ prepareTestData();
+ }
+
+ private void prepareTestData() {
+ testRows = new ArrayList<>();
+ Row row1 = Row.of("Anna", 100);
+ row1.setKind(RowKind.INSERT);
+ testRows.add(row1);
+
+ Row row2 = Row.of("Lisa", 90);
+ row2.setKind(RowKind.DELETE);
+ testRows.add(row2);
+
+ Row row3 = Row.of("Bob", 80);
+ row3.setKind(RowKind.UPDATE_BEFORE);
+ testRows.add(row3);
+
+ Row row4 = Row.of("Tom", 70);
+ row4.setKind(RowKind.UPDATE_AFTER);
+ testRows.add(row4);
+ }
+
+ @Override
+ protected void verifyData(List<Bytes> results) {
+ List<String> actualData = new ArrayList<>();
+ results.forEach(value -> actualData.add(new String(value.get())));
+ actualData.sort(String::compareTo);
+
+ List<String> expectedData = new ArrayList<>();
+ expectedData.add("{\"data\":[{\"f1\":\"Bob\",\"f2\":80}],\"type\":\"DELETE\"}");
+ expectedData.add("{\"data\":[{\"f1\":\"Tom\",\"f2\":70}],\"type\":\"INSERT\"}");
+ expectedData.add("{\"data\":[{\"f1\":\"Lisa\",\"f2\":90}],\"type\":\"DELETE\"}");
+ expectedData.add("{\"data\":[{\"f1\":\"Anna\",\"f2\":100}],\"type\":\"INSERT\"}");
+ expectedData.sort(String::compareTo);
+
+ assertEquals(expectedData, actualData);
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
index 7bade20..7239dc7 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -26,7 +26,10 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -34,22 +37,20 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
-public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
+public class RowToJsonKafkaSinkTest extends KafkaSinkTestBaseForRow {
@Override
protected void prepareData() {
topic = "test_kafka_row_to_json";
- prepareKafkaSinkInfo();
- prepareTestRows();
- }
+ serializationSchema = RowSerializationSchemaFactory.build(
+ new FieldInfo[]{
+ new FieldInfo("f1", new StringFormatInfo()),
+ new FieldInfo("f2", new MapFormatInfo(new StringFormatInfo(), new DoubleFormatInfo())),
+ new FieldInfo("f3", new ArrayFormatInfo(new IntFormatInfo()))
+ },
+ new JsonSerializationInfo()
+ );
- private void prepareKafkaSinkInfo() {
- fieldInfos = new FieldInfo[]{
- new FieldInfo("f1", new StringFormatInfo()),
- new FieldInfo("f2", new MapFormatInfo(new StringFormatInfo(), new DoubleFormatInfo())),
- new FieldInfo("f3", new ArrayFormatInfo(new IntFormatInfo()))
- };
-
- serializationInfo = new JsonSerializationInfo();
+ prepareTestRows();
}
private void prepareTestRows() {
@@ -69,14 +70,17 @@ public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
}
@Override
- protected void verifyData(List<String> results) {
+ protected void verifyData(List<Bytes> results) {
+ List<String> actualData = new ArrayList<>();
+ results.forEach(value -> actualData.add(new String(value.get(), StandardCharsets.UTF_8)));
+ actualData.sort(String::compareTo);
+
List<String> expectedData = new ArrayList<>();
expectedData.add("{\"f1\":\"zhangsan\",\"f2\":{\"high\":170.5},\"f3\":[123]}");
expectedData.add("{\"f1\":\"lisi\",\"f2\":{\"high\":180.5},\"f3\":[1234]}");
expectedData.add("{\"f1\":\"wangwu\",\"f2\":{\"high\":190.5},\"f3\":[12345]}");
-
- results.sort(String::compareTo);
expectedData.sort(String::compareTo);
- assertEquals(expectedData, results);
+
+ assertEquals(expectedData, actualData);
}
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
index fc10f1f..dbab9b6 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
@@ -22,30 +22,28 @@ import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
+public class RowToStringKafkaSinkTest extends KafkaSinkTestBaseForRow {
@Override
protected void prepareData() {
topic = "test_kafka_row_to_string";
- prepareKafkaSinkInfo();
- prepareTestRows();
- }
-
- private void prepareKafkaSinkInfo() {
- fieldInfos = new FieldInfo[]{
- new FieldInfo("f1", new StringFormatInfo()),
- new FieldInfo("f2", new DoubleFormatInfo())
- };
- serializationInfo = null;
- }
+ serializationSchema = RowSerializationSchemaFactory.build(
+ new FieldInfo[]{
+ new FieldInfo("f1", new StringFormatInfo()),
+ new FieldInfo("f2", new DoubleFormatInfo())
+ },
+ null
+ );
- private void prepareTestRows() {
testRows = new ArrayList<>();
testRows.add(Row.of("f1", 12.0));
testRows.add(Row.of("f2", 12.1));
@@ -53,13 +51,15 @@ public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
}
@Override
- protected void verifyData(List<String> results) {
+ protected void verifyData(List<Bytes> results) {
+ List<String> actualData = new ArrayList<>();
+ results.forEach(value -> actualData.add(new String(value.get(), StandardCharsets.UTF_8)));
+ actualData.sort(String::compareTo);
+
List<String> expectedData = new ArrayList<>();
testRows.forEach(row -> expectedData.add(row.toString()));
-
- results.sort(String::compareTo);
expectedData.sort(String::compareTo);
- assertEquals(expectedData, results);
+ assertEquals(expectedData, actualData);
}
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
index 0547202..c2894e9 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
@@ -20,11 +20,23 @@ package org.apache.inlong.sort.singletenant.flink.utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.junit.Test;
+import java.io.IOException;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -43,4 +55,245 @@ public class CommonUtilsTest {
assertEquals(Types.STRING, fieldTypesFlink[0]);
assertEquals(Types.BOOLEAN, fieldTypesFlink[1]);
}
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Test
+ public void testBuildAvroRecordSchemaInJsonForNormalFields() throws IOException {
+ FieldInfo[] testFieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", new StringFormatInfo()),
+ new FieldInfo("f2", new ByteFormatInfo())
+ };
+
+ JsonNode expectedJsonNode = objectMapper.readTree("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"record\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"f1\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " \"string\"\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"f2\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " \"int\"\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ String actualJson = buildAvroRecordSchemaInJson(testFieldInfos);
+ JsonNode actualJsonNode = objectMapper.readTree(actualJson);
+
+ assertEquals(expectedJsonNode, actualJsonNode);
+ }
+
+ @Test
+ public void testBuildAvroRecordSchemaInJsonForRecursiveFields() throws IOException {
+ FieldInfo[] testFieldInfos = new FieldInfo[]{
+ new FieldInfo("f1", new ArrayFormatInfo(
+ new MapFormatInfo(
+ new StringFormatInfo(),
+ new ArrayFormatInfo(new ArrayFormatInfo(new ShortFormatInfo()))
+ )
+ )),
+ new FieldInfo("f2", new MapFormatInfo(
+ new StringFormatInfo(),
+ new MapFormatInfo(
+ new StringFormatInfo(),
+ new RowFormatInfo(
+ new String[]{"f21", "f22"},
+ new FormatInfo[]{new IntFormatInfo(), new ArrayFormatInfo(new ByteFormatInfo())}
+ )
+ )
+ )),
+ new FieldInfo("f3", new RowFormatInfo(
+ new String[]{"f31", "f32"},
+ new FormatInfo[]{
+ new ArrayFormatInfo(new StringFormatInfo()),
+ new RowFormatInfo(
+ new String[]{"f321", "f322"},
+ new FormatInfo[]{
+ new ArrayFormatInfo(new IntFormatInfo()),
+ new MapFormatInfo(
+ new StringFormatInfo(),
+ new ArrayFormatInfo(new ByteFormatInfo())
+ )
+ }
+ )
+ }
+ ))
+ };
+
+ JsonNode expectedJsonNode = objectMapper.readTree("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"record\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"f1\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"map\",\n"
+ + " \"values\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " \"int\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"f2\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"map\",\n"
+ + " \"values\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"map\",\n"
+ + " \"values\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"record_f2\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"f21\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " \"int\"\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"f22\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " \"int\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"f3\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"record_f3\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"f31\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " \"string\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"f32\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"record_f3_f32\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"f321\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " \"int\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"f322\",\n"
+ + " \"type\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"map\",\n"
+ + " \"values\":[\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\":\"array\",\n"
+ + " \"items\":[\n"
+ + " \"null\",\n"
+ + " \"int\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\":null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ String actualJson = buildAvroRecordSchemaInJson(testFieldInfos);
+ JsonNode actualJsonNode = objectMapper.readTree(actualJson);
+
+ assertEquals(expectedJsonNode, actualJsonNode);
+ }
}