You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/26 08:20:16 UTC
[incubator-inlong] branch master updated: [INLONG-3372][Sort] Fix the binlog type is always INSERT when the output format is canal (#3373)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1c64fa7 [INLONG-3372][Sort] Fix the binlog type is always INSERT when the output format is canal (#3373)
1c64fa7 is described below
commit 1c64fa76e0eee86d9d1e449ea559a983cd8644cf
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Sat Mar 26 16:20:12 2022 +0800
[INLONG-3372][Sort] Fix the binlog type is always INSERT when the output format is canal (#3373)
Co-authored-by: tianqiwan <ti...@tencent.com>
Co-authored-by: dockerzhang <do...@apache.org>
---
.../flink/transformation/Transformer.java | 1 +
.../singletenant/flink/DebeziumToCanalITCase.java | 68 ++++++++++++++++++----
2 files changed, 59 insertions(+), 10 deletions(-)
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java
index e030fd0..3703e30 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java
@@ -88,6 +88,7 @@ public class Transformer extends ProcessFunction<Row, Row> {
return row -> {
final int sinkFieldsLength = sinkFieldInfos.length;
Row sinkRow = new Row(sinkFieldsLength);
+ sinkRow.setKind(row.getKind());
for (int i = 0; i < sinkFieldsLength; i++) {
String sinkFieldName = sinkFieldInfos[i].getName();
String sourceFieldName = sinkFieldNameToSourceFieldName.get(sinkFieldName);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
index 9a78959..e8f0495 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
@@ -42,10 +42,14 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
+import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationFunction;
import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationSchemaFactory;
import org.apache.inlong.sort.singletenant.flink.deserialization.FieldMappingTransformer;
import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.transformation.Transformer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +62,7 @@ public class DebeziumToCanalITCase {
private static final CountDownLatch jobFinishedLatch = new CountDownLatch(1);
- private final FieldInfo[] fieldInfos = new FieldInfo[]{
+ private final FieldInfo[] sourceFieldInfos = new FieldInfo[]{
new FieldInfo("name", StringFormatInfo.INSTANCE),
new FieldInfo("age", IntFormatInfo.INSTANCE),
new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
@@ -68,9 +72,20 @@ public class DebeziumToCanalITCase {
new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
};
+ private final FieldInfo[] sinkFieldInfos = new FieldInfo[]{
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("age", IntFormatInfo.INSTANCE),
+ new FieldInfo("inner_type", StringFormatInfo.INSTANCE),
+ new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
+ new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
+ new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
+ new BuiltInFieldInfo("isDdl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
+ new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
+ };
+
private static final String expectedResult =
- "{\"data\":[{\"name\":\"testName\",\"age\":29}],"
- + "\"type\":\"INSERT\",\"database\":\"test\",\"table\":\"test\","
+ "{\"data\":[{\"name\":\"testName\",\"age\":29,\"inner_type\":\"-D\"}],"
+ + "\"type\":\"DELETE\",\"database\":\"test\",\"table\":\"test\","
+ "\"es\":1644896917208,\"isDdl\":false}";
@Test(timeout = 60 * 1000)
@@ -86,19 +101,52 @@ public class DebeziumToCanalITCase {
// Deserialize
DeserializationSchema<Row> deserializationSchema = DeserializationSchemaFactory.build(
- fieldInfos,
+ sourceFieldInfos,
new DebeziumDeserializationInfo(false, "ISO_8601"));
FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(
- new Configuration(), fieldInfos);
+ new Configuration(), sourceFieldInfos);
DeserializationFunction function = new DeserializationFunction(
deserializationSchema, fieldMappingTransformer, false);
DataStream<Row> deserializedStream = sourceStream.process(function);
+ // Transform
+ TransformationInfo transformationInfo = new TransformationInfo(
+ new FieldMappingRule(new FieldMappingUnit[] {
+ new FieldMappingUnit(
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("name", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("age", StringFormatInfo.INSTANCE),
+ new FieldInfo("age", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("type", StringFormatInfo.INSTANCE),
+ new FieldInfo("inner_type", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("db", StringFormatInfo.INSTANCE),
+ new FieldInfo("db", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("table", StringFormatInfo.INSTANCE),
+ new FieldInfo("table", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("es", StringFormatInfo.INSTANCE),
+ new FieldInfo("es", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("isDdl", StringFormatInfo.INSTANCE),
+ new FieldInfo("isDdl", IntFormatInfo.INSTANCE)),
+ new FieldMappingUnit(
+ new FieldInfo("type", StringFormatInfo.INSTANCE),
+ new FieldInfo("type", IntFormatInfo.INSTANCE)),
+ }));
+ DataStream<Row> transformedStream = deserializedStream.process(new Transformer(
+ transformationInfo,
+ sourceFieldInfos,
+ sinkFieldInfos));
+
// Serialize and output
SerializationSchema<Row> serializationSchema = SerializationSchemaFactory.build(
- fieldInfos, new CanalSerializationInfo()
+ sinkFieldInfos, new CanalSerializationInfo()
);
- deserializedStream.addSink(new TestSink(serializationSchema));
+ transformedStream.addSink(new TestSink(serializationSchema));
env.execute();
@@ -134,8 +182,8 @@ public class DebeziumToCanalITCase {
private static class TestSource extends RichSourceFunction<SerializedRecord> {
String testString = "{\n"
- + " \"before\":null,\n"
- + " \"after\":{\n"
+ + " \"after\":null,\n"
+ + " \"before\":{\n"
+ " \"name\":\"testName\",\n"
+ " \"age\":29\n"
+ " },\n"
@@ -155,7 +203,7 @@ public class DebeziumToCanalITCase {
+ " \"thread\":13,\n"
+ " \"query\":null\n"
+ " },\n"
- + " \"op\":\"c\",\n"
+ + " \"op\":\"d\",\n"
+ " \"ts_ms\":1644896917208,\n"
+ " \"transaction\":null\n"
+ "}";