You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2023/01/14 15:24:52 UTC

[GitHub] [incubator-seatunnel] MonsterChenzhuo opened a new pull request, #3950: [Feature][formats][canal] Support read canal format message

MonsterChenzhuo opened a new pull request, #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1072048360


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+public class CanalJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String FIELD_DATA = "data";
+
+    private static final String FIELD_TYPE = "type";
+
+    private static final String OP_INSERT = "INSERT";
+
+    private static final String OP_UPDATE = "UPDATE";
+
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String OP_CREATE = "CREATE";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    private final SeaTunnelRowType physicalRowType;
+
+    public CanalJsonDeserializationSchema(SeaTunnelRowType physicalRowType,
+                                          String database,
+                                          String table,
+                                          boolean ignoreParseErrors
+                                          ) {
+        this.physicalRowType = physicalRowType;
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer = new JsonDeserializationSchema(
+            false,
+            ignoreParseErrors,
+            jsonRowType
+        );
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        if (message == null) {
+            return null;
+        }
+        return convertJsonNode(convertBytes(message));
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.physicalRowType;
+    }
+
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   example
   
   ```java
   public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
           JsonNode jsonNode = convert(message);
           ArrayNode dataNode = (ArrayNode) jsonNode.get("data");
           switch (jsonNode.get("type")) {
               case "update":
                   ArrayNode oldNode = (ArrayNode) jsonNode.get("old");
                   for (int i = 0; i < dataNode.size(); i++) {
                       JsonNode data = dataNode.get(i);
                       JsonNode old = oldNode.get(i);
   
                       SeaTunnelRow updateBefore = convert(jsonNode, data);
                       SeaTunnelRow updateAfter = convert(jsonNode, old);
   
                       out.collect(updateBefore);
                       out.collect(updateAfter);
                   }
                   break;
               case "create":
                   for (int i = 0; i < dataNode.size(); i++) {
                       JsonNode data = dataNode.get(i);
                       SeaTunnelRow insert = convert(jsonNode, data);
                       out.collect(insert);
                   }
                   break;
               case "delete":
                   for (int i = 0; i < dataNode.size(); i++) {
                       JsonNode data = dataNode.get(i);
                       SeaTunnelRow delete = convert(jsonNode, data);
                       out.collect(delete);
                   }
                   break;
           }
       }
   ```



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+public class CanalJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String FIELD_DATA = "data";
+
+    private static final String FIELD_TYPE = "type";
+
+    private static final String OP_INSERT = "INSERT";
+
+    private static final String OP_UPDATE = "UPDATE";
+
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String OP_CREATE = "CREATE";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    private final SeaTunnelRowType physicalRowType;
+
+    public CanalJsonDeserializationSchema(SeaTunnelRowType physicalRowType,
+                                          String database,
+                                          String table,
+                                          boolean ignoreParseErrors
+                                          ) {
+        this.physicalRowType = physicalRowType;
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer = new JsonDeserializationSchema(
+            false,
+            ignoreParseErrors,
+            jsonRowType
+        );
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        if (message == null) {
+            return null;
+        }
+        return convertJsonNode(convertBytes(message));
+    }

Review Comment:
   ```suggestion
       public SeaTunnelRow deserialize(byte[] message) throws IOException {
           throw new UnsupportedOperationException();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093973956


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}"

Review Comment:
   include insert, update, delete action
   include all mysql data types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1070885543


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData {@link
+ * DeserializationSchema}.
+ */
+
+public class CanalJsonFormatFactory

Review Comment:
   Please implement `DeserializationFormatFactory` and `SerializationFormatFactory` separately, because the `#optionRule()` is different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1090135326


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_console.conf:
##########
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test-cdc_mds"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "INT"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = canal-json
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Console {

Review Comment:
   Use Assert sink check data rows & fields



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   How to validate data output by sink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1383131013

   Please add e2e test case to check read canal message from kafka into seatunnel row


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1070302198


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class CanalJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String OP_INSERT = "INSERT";
+
+    private static final String OP_UPDATE = "UPDATE";
+
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String OP_CREATE = "CREATE";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    public CanalJsonDeserializationSchema(SeaTunnelRowType physicalRowType,
+                                          String database,
+                                          String table,
+                                          boolean ignoreParseErrors
+                                          ) {
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer = new JsonDeserializationSchema(
+            false,
+            ignoreParseErrors,
+            jsonRowType
+        );
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        if (message == null || message.length == 0) {
+            return;
+        }
+        try {
+            final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
+            if (database != null) {
+                if (!databasePattern
+                    .matcher(root.get("database").asText())
+                    .matches()) {
+                    return;
+                }
+            }
+            if (table != null) {
+                if (!tablePattern
+                    .matcher(root.get("table").asText())
+                    .matches()) {
+                    return;
+                }
+            }
+            SeaTunnelRow row = jsonDeserializer.convertToRowData(root);
+            SeaTunnelRow data = (SeaTunnelRow) row.getField(0);
+            String type = row.getField(2).toString();
+            if (OP_INSERT.equals(type)) {
+                out.collect(row);
+            } else if (OP_UPDATE.equals(type)) {
+                SeaTunnelRow after = (SeaTunnelRow) row.getField(0);
+                SeaTunnelRow before = (SeaTunnelRow) row.getField(1);
+                final JsonNode oldField = root.get(FIELD_OLD);
+                for (int f = 0; f < fieldCount; f++) {
+                    if (before.isNullAt(f) && oldField.findValue(fieldNames[f]) == null) {
+                        // fields in "old" (before) means the fields are changed
+                        // fields not in "old" (before) means the fields are not changed
+                        // so we just copy the not changed fields into before
+                        before.setField(f, after.getField(f));
+                    }
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                out.collect(before);
+                out.collect(after);
+            } else if (OP_DELETE.equals(type)) {
+                // "data" field is an array of row, contains deleted rows
+                data.setRowKind(RowKind.DELETE);
+                out.collect(data);
+            } else if (OP_CREATE.equals(type)) {
+                // "data" field is null and "type" is "CREATE" which means
+                // this is a DDL change event, and we should skip it.
+                return;
+            } else {
+                if (!ignoreParseErrors) {
+                    throw new IOException(
+                        format(
+                            "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
+                            type, new String(message)));
+                }
+            }
+
+        } catch (Throwable t) {
+            // a big try catch to protect the processing.
+            if (!ignoreParseErrors) {
+                throw new IOException(
+                    format("Corrupt Canal JSON message '%s'.", new String(message)), t);
+            }
+        }
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return null;
+    }
+
+    private static SeaTunnelRowType createJsonRowType(
+        SeaTunnelRowType physicalDataType) {
+        // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
+        SeaTunnelRowType root =
+            new SeaTunnelRowType(new String[]{"data", "old", "type", "database", "table"}, new SeaTunnelDataType[]{physicalDataType, physicalDataType, STRING_TYPE, STRING_TYPE, STRING_TYPE});
+        return root;
+    }
+
+    // ------------------------------------------------------------------------------------------
+    // Builder
+    // ------------------------------------------------------------------------------------------
+
+    /** Creates A builder for building a {@link CanalJsonDeserializationSchema}. */
+    public static Builder builder(
+        SeaTunnelRowType physicalDataType) {
+        return new Builder(physicalDataType);
+    }
+
+    public static class Builder{

Review Comment:
   Use `lombok` instead of it.



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class CanalJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String OP_INSERT = "INSERT";
+
+    private static final String OP_UPDATE = "UPDATE";
+
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String OP_CREATE = "CREATE";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    public CanalJsonDeserializationSchema(SeaTunnelRowType physicalRowType,
+                                          String database,
+                                          String table,
+                                          boolean ignoreParseErrors
+                                          ) {
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer = new JsonDeserializationSchema(
+            false,
+            ignoreParseErrors,
+            jsonRowType
+        );
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        if (message == null || message.length == 0) {
+            return;
+        }
+        try {
+            final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
+            if (database != null) {
+                if (!databasePattern
+                    .matcher(root.get("database").asText())
+                    .matches()) {
+                    return;
+                }
+            }
+            if (table != null) {
+                if (!tablePattern
+                    .matcher(root.get("table").asText())
+                    .matches()) {
+                    return;
+                }
+            }
+            SeaTunnelRow row = jsonDeserializer.convertToRowData(root);
+            SeaTunnelRow data = (SeaTunnelRow) row.getField(0);
+            String type = row.getField(2).toString();
+            if (OP_INSERT.equals(type)) {
+                out.collect(row);
+            } else if (OP_UPDATE.equals(type)) {
+                SeaTunnelRow after = (SeaTunnelRow) row.getField(0);
+                SeaTunnelRow before = (SeaTunnelRow) row.getField(1);
+                final JsonNode oldField = root.get(FIELD_OLD);
+                for (int f = 0; f < fieldCount; f++) {
+                    if (before.isNullAt(f) && oldField.findValue(fieldNames[f]) == null) {
+                        // fields in "old" (before) means the fields are changed
+                        // fields not in "old" (before) means the fields are not changed
+                        // so we just copy the not changed fields into before
+                        before.setField(f, after.getField(f));
+                    }
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                out.collect(before);
+                out.collect(after);
+            } else if (OP_DELETE.equals(type)) {
+                // "data" field is an array of row, contains deleted rows
+                data.setRowKind(RowKind.DELETE);
+                out.collect(data);
+            } else if (OP_CREATE.equals(type)) {
+                // "data" field is null and "type" is "CREATE" which means
+                // this is a DDL change event, and we should skip it.
+                return;
+            } else {
+                if (!ignoreParseErrors) {
+                    throw new IOException(
+                        format(
+                            "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
+                            type, new String(message)));
+                }
+            }
+
+        } catch (Throwable t) {
+            // a big try catch to protect the processing.
+            if (!ignoreParseErrors) {
+                throw new IOException(

Review Comment:
   Unfied exception



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.RowKind.INSERT;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+public class CanalJsonSerializationSchema implements SerializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    private transient SeaTunnelRow reuse;
+
+
+    private final JsonSerializationSchema jsonSerializer;
+
+    public CanalJsonSerializationSchema(
+        SeaTunnelRowType rowType) {
+        this.jsonSerializer = new JsonSerializationSchema(createJsonRowType(rowType));
+        this.reuse = new SeaTunnelRow(2);
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        try {
+            String opType = rowKind2String(row.getRowKind());
+            reuse.setField(0, row);
+            reuse.setField(1, opType);
+            return jsonSerializer.serialize(reuse);
+        } catch (Throwable t) {
+            throw new RuntimeException("Could not serialize row '" + row + "'.", t);

Review Comment:
   Unfied exception



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class CanalJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String OP_INSERT = "INSERT";
+
+    private static final String OP_UPDATE = "UPDATE";
+
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String OP_CREATE = "CREATE";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    public CanalJsonDeserializationSchema(SeaTunnelRowType physicalRowType,
+                                          String database,
+                                          String table,
+                                          boolean ignoreParseErrors
+                                          ) {
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer = new JsonDeserializationSchema(
+            false,
+            ignoreParseErrors,
+            jsonRowType
+        );
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        if (message == null || message.length == 0) {
+            return;
+        }
+        try {
+            final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
+            if (database != null) {
+                if (!databasePattern
+                    .matcher(root.get("database").asText())
+                    .matches()) {
+                    return;
+                }
+            }
+            if (table != null) {
+                if (!tablePattern
+                    .matcher(root.get("table").asText())
+                    .matches()) {
+                    return;
+                }
+            }
+            SeaTunnelRow row = jsonDeserializer.convertToRowData(root);
+            SeaTunnelRow data = (SeaTunnelRow) row.getField(0);
+            String type = row.getField(2).toString();
+            if (OP_INSERT.equals(type)) {
+                out.collect(row);
+            } else if (OP_UPDATE.equals(type)) {
+                SeaTunnelRow after = (SeaTunnelRow) row.getField(0);
+                SeaTunnelRow before = (SeaTunnelRow) row.getField(1);
+                final JsonNode oldField = root.get(FIELD_OLD);
+                for (int f = 0; f < fieldCount; f++) {
+                    if (before.isNullAt(f) && oldField.findValue(fieldNames[f]) == null) {
+                        // fields in "old" (before) means the fields are changed
+                        // fields not in "old" (before) means the fields are not changed
+                        // so we just copy the not changed fields into before
+                        before.setField(f, after.getField(f));
+                    }
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                out.collect(before);
+                out.collect(after);
+            } else if (OP_DELETE.equals(type)) {
+                // "data" field is an array of row, contains deleted rows
+                data.setRowKind(RowKind.DELETE);
+                out.collect(data);
+            } else if (OP_CREATE.equals(type)) {
+                // "data" field is null and "type" is "CREATE" which means
+                // this is a DDL change event, and we should skip it.
+                return;
+            } else {
+                if (!ignoreParseErrors) {
+                    throw new IOException(

Review Comment:
   Unfied exception



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.RowKind.INSERT;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+public class CanalJsonSerializationSchema implements SerializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    private transient SeaTunnelRow reuse;
+
+
+    private final JsonSerializationSchema jsonSerializer;
+
+    public CanalJsonSerializationSchema(
+        SeaTunnelRowType rowType) {
+        this.jsonSerializer = new JsonSerializationSchema(createJsonRowType(rowType));
+        this.reuse = new SeaTunnelRow(2);
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        try {
+            String opType = rowKind2String(row.getRowKind());
+            reuse.setField(0, row);
+            reuse.setField(1, opType);
+            return jsonSerializer.serialize(reuse);
+        } catch (Throwable t) {
+            throw new RuntimeException("Could not serialize row '" + row + "'.", t);
+        }
+    }
+
+    private String rowKind2String(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return OP_INSERT;
+            case UPDATE_BEFORE:
+            case DELETE:
+                return OP_DELETE;
+            default:
+                throw new UnsupportedOperationException(

Review Comment:
   Unfied exception



##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java:
##########
@@ -31,6 +31,7 @@
     public static final BasicType<Float> FLOAT_TYPE = new BasicType<>(Float.class, SqlType.FLOAT);
     public static final BasicType<Double> DOUBLE_TYPE = new BasicType<>(Double.class, SqlType.DOUBLE);
     public static final BasicType<Void> VOID_TYPE = new BasicType<>(Void.class, SqlType.NULL);
+    public static final BasicType<SeaTunnelRow> ROW_TYPE = new BasicType<>(SeaTunnelRow.class, SqlType.ROW);

Review Comment:
   Revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1096890597


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -80,6 +82,8 @@ private static SerializationSchema createSerializationSchema(SeaTunnelRowType ro
                     .seaTunnelRowType(rowType)
                     .delimiter(delimiter)
                     .build();
+        } else if (CANNAL_FORMAT.equals(format)) {

Review Comment:
   switch is better



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -210,6 +212,10 @@ private void setDeserialization(Config config) {
                     .seaTunnelRowType(typeInfo)
                     .delimiter(delimiter)
                     .build();
+            } else if (CANNAL_FORMAT.equals(format)) {

Review Comment:
   The same as above.



##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -58,6 +58,21 @@
             <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.5.0</version>
+        </dependency>

Review Comment:
   Aggre with @hailin0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1418472059

   @hailin0 @TyrantLucifer Thanks for your review,I have made the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093942584


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}"

Review Comment:
   mv to file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1416934678

   @hailin0 Thanks for your review,I have made the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1418471860

   <img width="1193" alt="图片" src="https://user-images.githubusercontent.com/60029759/216880154-059afd4b-eec9-46ec-9c8c-24cea41fea06.png">CI's es e2e and jdbc e2e will have unstable error reporting
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1418475871

   @hailin0 @TyrantLucifer Thanks for your review,I have made the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer merged pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer merged PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093942584


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}"

Review Comment:
   mv to file



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java:
##########
@@ -51,22 +49,12 @@ public DeserializationFormat createDeserializationFormat(TableFactoryContext con
         boolean ignoreParseErrors = JsonFormatOptions.getIgnoreParseErrors(options);
 
         // TODO config SeaTunnelRowType
-        return new DeserializationFormat() {
-            @Override
-            public DeserializationSchema createDeserializationSchema() {
-                return new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null);
-            }
-        };
+        return () -> new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null);

Review Comment:
   revert



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData {@link
+ * DeserializationSchema}.
+ */
+
+public class CanalJsonFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "canal-json";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // TODO config option rules
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public SerializationFormat createSerializationFormat(TableFactoryContext context) {
+        return () -> new CanalJsonSerializationSchema(null);

Review Comment:
   lambda cannot serialize and deserialize



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java:
##########
@@ -51,22 +49,12 @@ public DeserializationFormat createDeserializationFormat(TableFactoryContext con
         boolean ignoreParseErrors = JsonFormatOptions.getIgnoreParseErrors(options);
 
         // TODO config SeaTunnelRowType
-        return new DeserializationFormat() {
-            @Override
-            public DeserializationSchema createDeserializationSchema() {
-                return new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null);
-            }
-        };
+        return () -> new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null);
     }
 
     @Override
     public SerializationFormat createSerializationFormat(TableFactoryContext context) {
         // TODO config SeaTunnelRowType
-        return new SerializationFormat() {
-            @Override
-            public SerializationSchema createSerializationSchema() {
-                return new JsonSerializationSchema(null);
-            }
-        };
+        return () -> new JsonSerializationSchema(null);

Review Comment:
   revert



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData {@link
+ * DeserializationSchema}.
+ */
+
+public class CanalJsonFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "canal-json";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // TODO config option rules
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public SerializationFormat createSerializationFormat(TableFactoryContext context) {
+        return () -> new CanalJsonSerializationSchema(null);
+    }
+
+    @Override
+    public DeserializationFormat createDeserializationFormat(TableFactoryContext context) {
+        Map<String, String> options = context.getOptions();
+        boolean ignoreParseErrors = CanalJsonFormatOptions.getIgnoreParseErrors(options);
+        String  databaseInclude = CanalJsonFormatOptions.getDatabaseInclude(options);
+        String  tableInclude = CanalJsonFormatOptions.getTableInclude(options);
+
+        // TODO config SeaTunnelRowType
+        return () -> new CanalJsonDeserializationSchema(null, databaseInclude, tableInclude, ignoreParseErrors);

Review Comment:
   lambda cannot serialize and deserialize



##########
docs/en/connector-v2/formats/canal-json.md:
##########
@@ -0,0 +1,109 @@
+# Canal Format
+
+Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
+
+Canal is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (protobuf is the default format for Canal).
+
+Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
+
+    synchronizing incremental data from databases to other systems
+    auditing logs
+    real-time materialized views on databases
+    temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages.
+
+# How to use Canal format
+Canal provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:
+```bash
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+}
+```
+Note: please refer to Canal documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight). 
+The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. 
+Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test-canal-source"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = canal-json
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "localhost:9092"
+    topic = "test-canal-sink"
+  }
+}
+```
+
+# Format Options

Review Comment:
   move to top



##########
docs/en/connector-v2/formats/canal-json.md:
##########
@@ -0,0 +1,109 @@
+# Canal Format
+
+Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
+
+Canal is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (protobuf is the default format for Canal).
+
+Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
+
+    synchronizing incremental data from databases to other systems
+    auditing logs
+    real-time materialized views on databases
+    temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages.
+
+# How to use Canal format
+Canal provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:
+```bash
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+}
+```
+Note: please refer to Canal documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight). 
+The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. 
+Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test-canal-source"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = canal-json
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource

Review Comment:
   remove
   



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}"
+                        );
+
+        ArrayList<String> topics = new ArrayList<>();
+        topics.add(KAFKA_TOPIC);
+        kafkaConsumer.subscribe(topics);
+        while (result.size() < 9) {

Review Comment:
   Infinite loop



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java:
##########
@@ -51,22 +49,12 @@ public DeserializationFormat createDeserializationFormat(TableFactoryContext con
         boolean ignoreParseErrors = JsonFormatOptions.getIgnoreParseErrors(options);
 
         // TODO config SeaTunnelRowType
-        return new DeserializationFormat() {
-            @Override
-            public DeserializationSchema createDeserializationSchema() {
-                return new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null);
-            }
-        };
+        return () -> new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null);

Review Comment:
   lambda cannot serialize and deserialize



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1071948780


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -282,6 +290,15 @@ private void generateTestData(ProducerRecordConverter converter, int start, int
         }
     }
 
+    private void generateCanalTestData(String topic) {

Review Comment:
   Using canal server generate data
   
   https://github.com/alibaba/canal/wiki/Docker-QuickStart



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093984225


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+public class CanalJsonSerializationSchema implements SerializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    private transient SeaTunnelRow reuse;
+
+
+    private final JsonSerializationSchema jsonSerializer;
+
+    public CanalJsonSerializationSchema(
+        SeaTunnelRowType rowType) {
+        this.jsonSerializer = new JsonSerializationSchema(createJsonRowType(rowType));
+        this.reuse = new SeaTunnelRow(2);
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        try {
+            String opType = rowKind2String(row.getRowKind());
+            reuse.setField(0, row);
+            reuse.setField(1, opType);
+            return jsonSerializer.serialize(reuse);
+        } catch (Throwable t) {
+            throw new SeaTunnelJsonFormatException(CommonErrorCode.JSON_OPERATION_FAILED, String.format("Could not serialize row %s.", row), t);
+
+        }
+    }
+
+    private String rowKind2String(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return OP_INSERT;
+            case UPDATE_BEFORE:
+            case DELETE:
+                return OP_DELETE;

Review Comment:
   ```suggestion
               case INSERT:
                   return OP_INSERT;
               case UPDATE_AFTER:
                   return OP_UPDATE_AFTER;
               case UPDATE_BEFORE:
                   return OP_UPDATE_BEFORE;
               case DELETE:
                   return OP_DELETE;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1418471816

   <img width="1193" alt="图片" src="https://user-images.githubusercontent.com/60029759/216880154-059afd4b-eec9-46ec-9c8c-24cea41fea06.png">CI's es e2e and jdbc e2e will have unstable error reporting
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1097340809


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@DisabledOnContainer(value = {}, type = {EngineType.FLINK, EngineType.SPARK})
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    //----------------------------------------------------------------------------
+    // postgres
+    private static final String PG_IMAGE = "postgres:alpine3.16";
+
+    private static final String PG_DRIVER_JAR = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+    private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory = container -> {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + PG_DRIVER_JAR);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    };
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    private void createPostgreSQLContainer() throws ClassNotFoundException {
+        POSTGRESQL_CONTAINER = new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+                .withNetwork(NETWORK)
+                .withNetworkAliases("postgresql")
+                .withExposedPorts(5432)

Review Comment:
   It's better to use constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1407368956

   @hailin0 @TyrantLucifer Thanks for your review,I have made the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1418559649

   Overall LGTM. Let's waiting CICD


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1096876446


##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -58,6 +58,21 @@
             <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.5.0</version>
+        </dependency>

Review Comment:
   revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1383131235

   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1090141130


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   reference check cdc sink output
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java#L87



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1407965845

   add docs into `docs/en/connector-v2/formats/canal-json.md` and link to kafka docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1096980335


##########
docs/en/connector-v2/formats/canal-json.md:
##########
@@ -0,0 +1,110 @@
+# Canal Format
+
+Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
+
+Canal is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (protobuf is the default format for Canal).
+
+Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
+
+    synchronizing incremental data from databases to other systems
+    auditing logs
+    real-time materialized views on databases
+    temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages.
+
+# Format Options
+| option                         | default  | required | Description                                                                                                                                                                                                |
+|--------------------------------|----------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| format                         | (none)   | yes      | Specify what format to use, here should be 'canal-json'.                                                                                                                                                   |
+| canal-json.ignore-parse-errors | false    | no       | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.                                                                                                       |
+| canal-json.database.include    | (none)   | no       | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
+| canal-json.table.include       | (none)   | no       | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern.       |
+
+# How to use Canal format
+
+## Kafka uses example
+Canal provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:
+```bash
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+}
+```
+Note: please refer to Canal documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight). 
+The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. 
+Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test-canal-source"

Review Comment:
   In line 72, the topic name is products_binlog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1072048360


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+public class CanalJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+
+    private static final String FIELD_DATA = "data";
+
+    private static final String FIELD_TYPE = "type";
+
+    private static final String OP_INSERT = "INSERT";
+
+    private static final String OP_UPDATE = "UPDATE";
+
+    private static final String OP_DELETE = "DELETE";
+
+    private static final String OP_CREATE = "CREATE";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    private final SeaTunnelRowType physicalRowType;
+
+    public CanalJsonDeserializationSchema(SeaTunnelRowType physicalRowType,
+                                          String database,
+                                          String table,
+                                          boolean ignoreParseErrors
+                                          ) {
+        this.physicalRowType = physicalRowType;
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer = new JsonDeserializationSchema(
+            false,
+            ignoreParseErrors,
+            jsonRowType
+        );
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        if (message == null) {
+            return null;
+        }
+        return convertJsonNode(convertBytes(message));
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.physicalRowType;
+    }
+
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {

Review Comment:
   suggestion
   
   
   ```java
   public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
           JsonNode jsonNode = convert(message);
           ArrayNode dataNode = (ArrayNode) jsonNode.get("data");
           switch (jsonNode.get("type")) {
               case "update":
                   ArrayNode oldNode = (ArrayNode) jsonNode.get("old");
                   for (int i = 0; i < dataNode.size(); i++) {
                       JsonNode data = dataNode.get(i);
                       JsonNode old = oldNode.get(i);
   
                       SeaTunnelRow updateBefore = convert(jsonNode, data);
                       SeaTunnelRow updateAfter = convert(jsonNode, old);
   
                       out.collect(updateBefore);
                       out.collect(updateAfter);
                   }
                   break;
               case "create":
                   for (int i = 0; i < dataNode.size(); i++) {
                       JsonNode data = dataNode.get(i);
                       SeaTunnelRow insert = convert(jsonNode, data);
                       out.collect(insert);
                   }
                   break;
               case "delete":
                   for (int i = 0; i < dataNode.size(); i++) {
                       JsonNode data = dataNode.get(i);
                       SeaTunnelRow delete = convert(jsonNode, data);
                       out.collect(delete);
                   }
                   break;
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#issuecomment-1400522698

   Because of #3947 , now all modules should use the shade jackson to avoid conflict with other component. So you should add the prefix `org.apache.seatunnel.shade` on every jacskon class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1090141130


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    //----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    //----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties")
+            .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java#L87



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3950: [Feature][formats][canal] Support read canal format message

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3950:
URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093984225


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+public class CanalJsonSerializationSchema implements SerializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    private transient SeaTunnelRow reuse;
+
+
+    private final JsonSerializationSchema jsonSerializer;
+
+    public CanalJsonSerializationSchema(
+        SeaTunnelRowType rowType) {
+        this.jsonSerializer = new JsonSerializationSchema(createJsonRowType(rowType));
+        this.reuse = new SeaTunnelRow(2);
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        try {
+            String opType = rowKind2String(row.getRowKind());
+            reuse.setField(0, row);
+            reuse.setField(1, opType);
+            return jsonSerializer.serialize(reuse);
+        } catch (Throwable t) {
+            throw new SeaTunnelJsonFormatException(CommonErrorCode.JSON_OPERATION_FAILED, String.format("Could not serialize row %s.", row), t);
+
+        }
+    }
+
+    private String rowKind2String(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return OP_INSERT;
+            case UPDATE_BEFORE:
+            case DELETE:
+                return OP_DELETE;

Review Comment:
   ```suggestion
               case INSERT:
                   return OP_INSERT;
               case UPDATE_AFTER:
                   return OP_UPDATE_AFTER;
               case UPDATE_BEFORE:
                   return OP_UPDATE_BEFORE;
               case DELETE:
                   return OP_DELETE;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org