You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/26 08:20:16 UTC

[incubator-inlong] branch master updated: [INLONG-3372][Sort] Fix the binlog type is always INSERT when the output format is canal (#3373)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c64fa7  [INLONG-3372][Sort] Fix the binlog type is always INSERT when the output format is canal (#3373)
1c64fa7 is described below

commit 1c64fa76e0eee86d9d1e449ea559a983cd8644cf
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Sat Mar 26 16:20:12 2022 +0800

    [INLONG-3372][Sort] Fix the binlog type is always INSERT when the output format is canal (#3373)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
    Co-authored-by: dockerzhang <do...@apache.org>
---
 .../flink/transformation/Transformer.java          |  1 +
 .../singletenant/flink/DebeziumToCanalITCase.java  | 68 ++++++++++++++++++----
 2 files changed, 59 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java
index e030fd0..3703e30 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/transformation/Transformer.java
@@ -88,6 +88,7 @@ public class Transformer extends ProcessFunction<Row, Row> {
             return row -> {
                 final int sinkFieldsLength = sinkFieldInfos.length;
                 Row sinkRow = new Row(sinkFieldsLength);
+                sinkRow.setKind(row.getKind());
                 for (int i = 0; i < sinkFieldsLength; i++) {
                     String sinkFieldName = sinkFieldInfos[i].getName();
                     String sourceFieldName = sinkFieldNameToSourceFieldName.get(sinkFieldName);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
index 9a78959..e8f0495 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
@@ -42,10 +42,14 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
 import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
+import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
 import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationFunction;
 import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationSchemaFactory;
 import org.apache.inlong.sort.singletenant.flink.deserialization.FieldMappingTransformer;
 import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.transformation.Transformer;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +62,7 @@ public class DebeziumToCanalITCase {
 
     private static final CountDownLatch jobFinishedLatch = new CountDownLatch(1);
 
-    private final FieldInfo[] fieldInfos = new FieldInfo[]{
+    private final FieldInfo[] sourceFieldInfos = new FieldInfo[]{
             new FieldInfo("name", StringFormatInfo.INSTANCE),
             new FieldInfo("age", IntFormatInfo.INSTANCE),
             new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
@@ -68,9 +72,20 @@ public class DebeziumToCanalITCase {
             new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
     };
 
+    private final FieldInfo[] sinkFieldInfos = new FieldInfo[]{
+            new FieldInfo("name", StringFormatInfo.INSTANCE),
+            new FieldInfo("age", IntFormatInfo.INSTANCE),
+            new FieldInfo("inner_type", StringFormatInfo.INSTANCE),
+            new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
+            new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
+            new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
+            new BuiltInFieldInfo("isDdl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
+            new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
+    };
+
     private static final String expectedResult =
-            "{\"data\":[{\"name\":\"testName\",\"age\":29}],"
-                    + "\"type\":\"INSERT\",\"database\":\"test\",\"table\":\"test\","
+            "{\"data\":[{\"name\":\"testName\",\"age\":29,\"inner_type\":\"-D\"}],"
+                    + "\"type\":\"DELETE\",\"database\":\"test\",\"table\":\"test\","
                     + "\"es\":1644896917208,\"isDdl\":false}";
 
     @Test(timeout = 60 * 1000)
@@ -86,19 +101,52 @@ public class DebeziumToCanalITCase {
 
                 // Deserialize
                 DeserializationSchema<Row> deserializationSchema = DeserializationSchemaFactory.build(
-                        fieldInfos,
+                        sourceFieldInfos,
                         new DebeziumDeserializationInfo(false, "ISO_8601"));
                 FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(
-                        new Configuration(), fieldInfos);
+                        new Configuration(), sourceFieldInfos);
                 DeserializationFunction function = new DeserializationFunction(
                         deserializationSchema, fieldMappingTransformer, false);
                 DataStream<Row> deserializedStream = sourceStream.process(function);
 
+                // Transform
+                TransformationInfo transformationInfo = new TransformationInfo(
+                        new FieldMappingRule(new FieldMappingUnit[] {
+                                new FieldMappingUnit(
+                                        new FieldInfo("name", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("name", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("age", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("age", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("type", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("inner_type", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("db", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("db", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("table", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("table", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("es", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("es", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("isDdl", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("isDdl", IntFormatInfo.INSTANCE)),
+                                new FieldMappingUnit(
+                                        new FieldInfo("type", StringFormatInfo.INSTANCE),
+                                        new FieldInfo("type", IntFormatInfo.INSTANCE)),
+                        }));
+                DataStream<Row> transformedStream = deserializedStream.process(new Transformer(
+                        transformationInfo,
+                        sourceFieldInfos,
+                        sinkFieldInfos));
+
                 // Serialize and output
                 SerializationSchema<Row> serializationSchema = SerializationSchemaFactory.build(
-                        fieldInfos, new CanalSerializationInfo()
+                        sinkFieldInfos, new CanalSerializationInfo()
                 );
-                deserializedStream.addSink(new TestSink(serializationSchema));
+                transformedStream.addSink(new TestSink(serializationSchema));
 
                 env.execute();
 
@@ -134,8 +182,8 @@ public class DebeziumToCanalITCase {
     private static class TestSource extends RichSourceFunction<SerializedRecord> {
 
         String testString = "{\n"
-                + "    \"before\":null,\n"
-                + "    \"after\":{\n"
+                + "    \"after\":null,\n"
+                + "    \"before\":{\n"
                 + "        \"name\":\"testName\",\n"
                 + "        \"age\":29\n"
                 + "    },\n"
@@ -155,7 +203,7 @@ public class DebeziumToCanalITCase {
                 + "        \"thread\":13,\n"
                 + "        \"query\":null\n"
                 + "    },\n"
-                + "    \"op\":\"c\",\n"
+                + "    \"op\":\"d\",\n"
                 + "    \"ts_ms\":1644896917208,\n"
                 + "    \"transaction\":null\n"
                 + "}";