You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "TaoZex (via GitHub)" <gi...@apache.org> on 2023/03/06 13:19:44 UTC

[GitHub] [incubator-seatunnel] TaoZex opened a new pull request, #3981: [Feature][connector][kafka] Support read debezium format message from kafka

TaoZex opened a new pull request, #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   https://github.com/apache/incubator-seatunnel/issues/3743
   Support read debezium format message from kafka
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1396544259

   > > I will add e2e test for this feature.
   > 
   > reference https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
   
   https://debezium.io/documentation/reference/2.1/tutorial.html#starting-kafka-connect


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1500904243

   please rebase dev branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1124008002


##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DebeziumJsonSerDeSchemaTest {
+
+    private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {"id", "name", "description", "weight"},
+                    new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, STRING_TYPE, FLOAT_TYPE});
+
+    @Test
+    void testNullRowMessages() throws Exception {
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, false, false);
+        SimpleCollector collector = new SimpleCollector();
+
+        deserializationSchema.deserialize(null, collector);
+        deserializationSchema.deserialize(new byte[0], collector);
+        assertEquals(0, collector.list.size());
+    }
+
+    @Test
+    public void testSerializationAndSchemaIncludeDeserialization() throws Exception {
+        testSerializationDeserialization("debezium-data-schema-include.txt", true);
+    }
+
+    @Test
+    public void testSerializationAndSchemaExcludeDeserialization() throws Exception {
+        testSerializationDeserialization("debezium-data-schema-exclude.txt", false);
+    }
+
+    private void testSerializationDeserialization(String resourceFile, boolean schemaInclude)
+            throws Exception {
+        List<String> lines = readLines(resourceFile);
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, schemaInclude, false);
+
+        SimpleCollector collector = new SimpleCollector();
+
+        for (String line : lines) {
+            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
+        }
+
+        List<String> expected =
+                Arrays.asList(
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[101, scooter, Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[102, car battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[104, hammer, 12oz carpenter's hammer, 0.75]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[105, hammer, 14oz carpenter's hammer, 0.875]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[107, rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[108, jacket, water resistent black wind breaker, 0.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[109, spare tire, 24 inch spare tire, 22.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[106, hammer, 18oz carpenter hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[107, rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[107, rocks, box of assorted rocks, 5.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[110, jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[111, scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[110, jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[110, jacket, new water resistent white wind breaker, 0.5]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[111, scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[111, scooter, Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=-1, kind=-D, fields=[111, scooter, Big 2-wheel scooter , 5.17]}");
+        List<String> actual =
+                collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        DebeziumJsonSerializationSchema serializationSchema =
+                new DebeziumJsonSerializationSchema(PHYSICAL_DATA_TYPE);
+
+        actual = new ArrayList<>();
+        for (SeaTunnelRow rowData : collector.list) {
+            actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
+        }
+
+        expected =
+                Arrays.asList(
+                        "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
+        assertEquals(expected, actual);
+    }
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;

Review Comment:
   Done. Thanks for your advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1501032182

   Wait for https://github.com/apache/incubator-seatunnel/pull/4524 merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1418949089

   > reference this pr start mysql docker & kafka docker & debezium docker
   > 
   > #3950 https://debezium.io/documentation/reference/2.1/tutorial.html#starting-services
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 2000liux commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by GitBox <gi...@apache.org>.
2000liux commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1396817521

   nice。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1407942360

   reference this pr start mysql docker & kafka docker & debezium docker
   
   https://github.com/apache/incubator-seatunnel/pull/3950
   https://debezium.io/documentation/reference/2.1/tutorial.html#starting-services


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1133636370


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   example
   ```java
   public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
       if (message == null || message.length == 0) {
               // skip tombstone messages
               return;
       }
   
       ObjectNode jsonNode = (ObjectNode) convertBytes(message);
       String op = jsonNode.get("op").asText();
       switch (op) {
           case "r":
           case "c":
               SeaTunnelRow insert = convert(jsonNode.get("after"));
               insert.setRowKind(RowKind.INSERT);
               out.collect(insert);
               break;
           case "u":
               SeaTunnelRow before = convert(jsonNode.get("before"));;
               before.setRowKind(RowKind.UPDATE_BEFORE);
               out.collect(before);
   
               SeaTunnelRow after = convert(jsonNode.get("after"));;
               after.setRowKind(RowKind.UPDATE_AFTER);
               out.collect(after);
               break;
           case "d":
               SeaTunnelRow delete = convert(jsonNode.get("after"));;
               before.setRowKind(RowKind.DELETE);
               out.collect(after);
               break;
           }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1133636370


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   example
   ```java
   public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
       if (message == null || message.length == 0) {
               // skip tombstone messages
               return;
       }
   
       ObjectNode jsonNode = (ObjectNode) convertBytes(message);
       String op = jsonNode.get("op").asText();
       switch (op) {
                   case "r":
                   case "c":
                       SeaTunnelRow insert = convert(jsonNode.get("after"));
                       insert.setRowKind(RowKind.INSERT);
                       out.collect(insert);
                   case "u":
                       SeaTunnelRow before = convert(jsonNode.get("before"));;
                       before.setRowKind(RowKind.UPDATE_BEFORE);
                       out.collect(before);
   
                       SeaTunnelRow after = convert(jsonNode.get("after"));;
                       after.setRowKind(RowKind.UPDATE_AFTER);
                       out.collect(after);
                       break;
                   case "d":
                       SeaTunnelRow delete = convert(jsonNode.get("after"));;
                       before.setRowKind(RowKind.DELETE);
                       out.collect(after);
                       break;
               }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/seatunnel/pull/3981#issuecomment-1570222859

   <img width="873" alt="image" src="https://github.com/apache/seatunnel/assets/14371345/0a7f6a29-7b36-4f98-b8cc-969a4b923fa2">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1104597462


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SEATUNNEL, EngineType.SPARK})
+public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DebeziumToKafkaIT.class);
+
+    private static DockerComposeContainer COMPOSE_CONTAINER;
+
+    // ----------------------------------------------------------------------------
+    // kafka
+
+    private static final String KAFKA_TOPIC = "customer";
+
+    private static final int KAFKA_PORT = 9092;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    // ----------------------------------------------------------------------------
+    // postgres
+    private static final String PG_IMAGE = "postgres:alpine3.16";
+
+    private static final String PG_DRIVER_JAR =
+            "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+    private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + PG_DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+
+                Path jsonPath =
+                        ContainerUtil.getResourcesFile("/debezium/register-mysql.json").toPath();
+                container.copyFileToContainer(
+                        MountableFile.forHostPath(jsonPath),
+                        "/tmp/seatunnel/plugins/Jdbc/register-mysql.json");
+                Container.ExecResult extraCommand =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H \"Accept:application/json\" -H  \"Content-Type:application/json\" http://"
+                                        + getLinuxLocalIp()
+                                        + ":8083/connectors/ -d @register-mysql.json");
+                Assertions.assertEquals(0, extraCommand.getExitCode());
+            };
+
+    private void createPostgreSQLContainer() throws ClassNotFoundException {
+        POSTGRESQL_CONTAINER =
+                new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("postgresql")
+                        .withExposedPorts(5432)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException {
+        COMPOSE_CONTAINER =
+                new DockerComposeContainer(
+                        new File("src/test/resources/docker/docker-compose-mysql.yaml"));
+        Startables.deepStart(Stream.of(COMPOSE_CONTAINER)).join();
+
+        LOG.info("The stage: Starting PostgreSQL container...");
+        createPostgreSQLContainer();
+        Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+        Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+        LOG.info("postgresql Containers are started");
+
+        given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+    }
+
+    @TestTemplate
+    public void testKafakSinkDebeziumFormat(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/kafkasource_debezium_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");

Review Comment:
   test update action ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by GitBox <gi...@apache.org>.
TaoZex commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1396477370

   I will add e2e test for this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1118844436


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatOptions;
+
+import java.util.Map;
+
+public class DebeziumJsonFormatOptions {
+
+    public static final int GENERATE_ROW_SIZE = 3;
+
+    public static final Option<Boolean> IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+    public static final Option<Boolean> SCHEMA_INCLUDE =
+            Options.key("schema-include")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When setting up a Debezium Kafka Connect, users can enable "
+                                    + "a Kafka configuration 'value.converter.schemas.enable' to include schema in the message. "
+                                    + "This option indicates the Debezium JSON data include the schema in the message or not. "
+                                    + "Default is false.");
+
+    public static boolean getSchemaInclude(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(SCHEMA_INCLUDE.key(), SCHEMA_INCLUDE.toString()));

Review Comment:
   SCHEMA_INCLUDE.defaultValue() ?



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatOptions;
+
+import java.util.Map;
+
+public class DebeziumJsonFormatOptions {
+
+    public static final int GENERATE_ROW_SIZE = 3;
+
+    public static final Option<Boolean> IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+    public static final Option<Boolean> SCHEMA_INCLUDE =
+            Options.key("schema-include")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When setting up a Debezium Kafka Connect, users can enable "
+                                    + "a Kafka configuration 'value.converter.schemas.enable' to include schema in the message. "
+                                    + "This option indicates the Debezium JSON data include the schema in the message or not. "
+                                    + "Default is false.");
+
+    public static boolean getSchemaInclude(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(SCHEMA_INCLUDE.key(), SCHEMA_INCLUDE.toString()));
+    }
+
+    public static boolean getIgnoreParseErrors(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(IGNORE_PARSE_ERRORS.key(), IGNORE_PARSE_ERRORS.toString()));

Review Comment:
   IGNORE_PARSE_ERRORS.defaultValue()?



##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DebeziumJsonSerDeSchemaTest {
+
+    private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {"id", "name", "description", "weight"},
+                    new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, STRING_TYPE, FLOAT_TYPE});
+
+    @Test
+    void testNullRowMessages() throws Exception {
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, false, false);
+        SimpleCollector collector = new SimpleCollector();
+
+        deserializationSchema.deserialize(null, collector);
+        deserializationSchema.deserialize(new byte[0], collector);
+        assertEquals(0, collector.list.size());
+    }
+
+    @Test
+    public void testSerializationAndSchemaIncludeDeserialization() throws Exception {
+        testSerializationDeserialization("debezium-data-schema-include.txt", true);
+    }
+
+    @Test
+    public void testSerializationAndSchemaExcludeDeserialization() throws Exception {
+        testSerializationDeserialization("debezium-data-schema-exclude.txt", false);
+    }
+
+    private void testSerializationDeserialization(String resourceFile, boolean schemaInclude)
+            throws Exception {
+        List<String> lines = readLines(resourceFile);
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, schemaInclude, false);
+
+        SimpleCollector collector = new SimpleCollector();
+
+        for (String line : lines) {
+            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
+        }
+
+        List<String> expected =
+                Arrays.asList(
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[101, scooter, Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[102, car battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[104, hammer, 12oz carpenter's hammer, 0.75]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[105, hammer, 14oz carpenter's hammer, 0.875]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[107, rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[108, jacket, water resistent black wind breaker, 0.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[109, spare tire, 24 inch spare tire, 22.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[106, hammer, 18oz carpenter hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[107, rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[107, rocks, box of assorted rocks, 5.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[110, jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[111, scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[110, jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[110, jacket, new water resistent white wind breaker, 0.5]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[111, scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[111, scooter, Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=-1, kind=-D, fields=[111, scooter, Big 2-wheel scooter , 5.17]}");
+        List<String> actual =
+                collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        DebeziumJsonSerializationSchema serializationSchema =
+                new DebeziumJsonSerializationSchema(PHYSICAL_DATA_TYPE);
+
+        actual = new ArrayList<>();
+        for (SeaTunnelRow rowData : collector.list) {
+            actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
+        }
+
+        expected =
+                Arrays.asList(
+                        "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
+                        "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
+                        "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
+        assertEquals(expected, actual);
+    }
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;

Review Comment:
   Assertsions.nonNull



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1396541630

   > I will add e2e test for this feature.
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1407940422

   > Could you please add e2e test case about this pull request?
   
   @TaoZex 
   reference check cdc sink output
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java#L87


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1126095803


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+
+        try {
+            SeaTunnelRow row = jsonDeserializer.deserialize(message);

Review Comment:
   reference
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java#L116



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/seatunnel/pull/3981#issuecomment-1570221823

   <img width="1457" alt="image" src="https://github.com/apache/seatunnel/assets/14371345/71f9afe9-aed4-4c70-9f3e-9b8a1df0485c">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1126095803


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+
+        try {
+            SeaTunnelRow row = jsonDeserializer.deserialize(message);

Review Comment:
   reference
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java#L116
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java#L265



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1465756321

   check ci error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1133638605


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   Thanks for your advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1133636742


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   @TaoZex 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1134817978


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
+
+public class DebeziumJsonSerializationSchema implements SerializationSchema {

Review Comment:
   remove this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1407966920

   
   add docs into docs/en/connector-v2/formats/debezium-json.md and link to kafka source docs
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#discussion_r1133636370


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.debezium;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+public class DebeziumJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Debezium Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private final SeaTunnelRowType rowType;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    /**
+     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
+     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
+     * information, but we just ignore "schema" and extract data from "payload".
+     */
+    private final boolean schemaInclude;
+
+    private final boolean ignoreParseErrors;
+
+    public DebeziumJsonDeserializationSchema(
+            SeaTunnelRowType rowType, boolean schemaInclude, boolean ignoreParseErrors) {
+        this.rowType = rowType;
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(
+                        false, ignoreParseErrors, createJsonRowType(rowType, schemaInclude));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   example
   ```java
   public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
       if (message == null || message.length == 0) {
               // skip tombstone messages
               return;
       }
   
       ObjectNode jsonNode = (ObjectNode) convertBytes(message);
       String op = jsonNode.get("op").asText();
       switch (op) {
           case "r":
           case "c":
               SeaTunnelRow insert = convertJsonNode(jsonNode.get("after"));
               insert.setRowKind(RowKind.INSERT);
               out.collect(insert);
               break;
           case "u":
               SeaTunnelRow before = convertJsonNode(jsonNode.get("before"));;
               before.setRowKind(RowKind.UPDATE_BEFORE);
               out.collect(before);
   
               SeaTunnelRow after = convertJsonNode(jsonNode.get("after"));;
               after.setRowKind(RowKind.UPDATE_AFTER);
               out.collect(after);
               break;
           case "d":
               SeaTunnelRow delete = convertJsonNode(jsonNode.get("after"));;
               before.setRowKind(RowKind.DELETE);
               out.collect(after);
               break;
           }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] hailin0 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3981:
URL: https://github.com/apache/seatunnel/pull/3981#issuecomment-1648960791

   Sorry, this PR has been merged
   
   https://github.com/apache/seatunnel/pull/5066
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on PR #3981:
URL: https://github.com/apache/incubator-seatunnel/pull/3981#issuecomment-1527037729

   Can you rebase dev branch and push again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] zhilinli123 commented on pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #3981:
URL: https://github.com/apache/seatunnel/pull/3981#issuecomment-1588843307

   check CI @TaoZex 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex closed pull request #3981: [Feature][connector][kafka] Support read debezium format message from kafka
URL: https://github.com/apache/incubator-seatunnel/pull/3981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org