You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "zhilinli123 (via GitHub)" <gi...@apache.org> on 2023/04/24 14:41:25 UTC

[GitHub] [incubator-seatunnel] zhilinli123 opened a new pull request, #4639: [Feature][E2E] Add mysql-cdc e2e testcase

zhilinli123 opened a new pull request, #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639

   issues:https://github.com/apache/incubator-seatunnel/issues/4603
   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534298377

   > > 等待#4671合并和JDBC MySQL YEAR类型支持,然后添加相应的数据类型测试
   > 
   > #4671 合并。
   
   
   
   > > Wait for #4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
   > 
   > #4671 is merged.
   
   I'm going to support year first and refine the type
   https://github.com/apache/incubator-seatunnel/issues/4695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1524950949

   > I suggest covering the data types fixed in these two PRs: #4670 TIME #4671 tinyint(1) bit(1) bit(8) bit(32) bit(64)
   
   Can you refer to  : https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1186632838


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, f_year from mysql_cdc_e2e_source_table";
+    private static final String SINK_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";

Review Comment:
   > sink table is `mysql_cdc_e2e_source_table`?
   
   Sorry for my negligence.  I have corrected it.  Thank you for your review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1186632838


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, f_year from mysql_cdc_e2e_source_table";
+    private static final String SINK_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";

Review Comment:
   > sink table is `mysql_cdc_e2e_source_table`?
   
   Sorry for my negligence.  I have corrected it.  Thank you for your review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173446059


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml:
##########
@@ -82,24 +82,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-    <build>

Review Comment:
   Or, fix find connector jar package logic to ignore test.jar
   
   org.apache.seatunnel.e2e.common.util.ContainerUtil#copyConnectorJarToContainer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1186595134


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, f_year from mysql_cdc_e2e_source_table";
+    private static final String SINK_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";

Review Comment:
   sink table is `mysql_cdc_e2e_source_table`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534246175

   > Wait for #4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
   
   #4671  is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1535968091

   @ic4y @hailin0 @EricJoy2048  Please take a look thanks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1187561597


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, f_year from mysql_cdc_e2e_source_table";
+    private static final String SINK_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";

Review Comment:
   @hailin0 Thank you ci review I have changed it. Thank you for your careful review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1187561597


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, f_year from mysql_cdc_e2e_source_table";
+    private static final String SINK_SQL =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";

Review Comment:
   @hailin0 Thank you ci review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 merged PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1520298798

       rerun CI  thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 closed pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 closed pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
URL: https://github.com/apache/incubator-seatunnel/pull/4639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1523198177

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1524924163

   I suggest covering the data types fixed in these two PRs:
   https://github.com/apache/incubator-seatunnel/pull/4670
   TIME
   https://github.com/apache/incubator-seatunnel/pull/4671
   tinyint(1) bit(1) bit(8) bit(32) bit(64)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1177709745


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql:
##########
@@ -0,0 +1,52 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  inventory
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `mysql_cdc`;
+
+use mysql_cdc;
+-- Create a mysql data source table
+CREATE TABLE mysql_cdc.products_cdc (

Review Comment:
   create table if not exists test1000004
   (
       id                   int auto_increment primary key,
       f_binary             binary(64)          null,
       f_blob               blob                null,
       f_long_varbinary     mediumblob          null,
       f_longblob           longblob            null,
       f_tinyblob           tinyblob            null,
       f_varbinary          varbinary(100)      null,
       f_smallint           smallint            null,
       f_smallint_unsigned  smallint unsigned   null,
       f_mediumint          mediumint           null,
       f_mediumint_unsigned mediumint unsigned  null,
       f_int                int                 null,
       f_int_unsigned       int unsigned        null,
       f_integer            int                 null,
       f_integer_unsigned   int unsigned        null,
       f_bigint             bigint              null,
       f_bigint_unsigned    bigint unsigned     null,
       f_numeric            decimal             null,
       f_decimal            decimal             null,
       f_float              float               null,
       f_double             double              null,
       f_double_precision   double              null,
       f_longtext           longtext            null,
       f_mediumtext         mediumtext          null,
       f_text               text                null,
       f_tinytext           tinytext            null,
       f_varchar            varchar(100)        null,
       f_date               date                null,
       f_datetime           datetime            null,
       f_timestamp          timestamp           null,
       f_bit1 BIT(1) NULL,
       f_bit64 BIT(64) NULL,
       f_char CHAR NULL,
       f_enum ENUM('enum1', 'enum2', 'enum3') NULL,
       f_mediumblob MEDIUMBLOB NULL,
       f_long_varchar LONG VARCHAR NULL,
       f_real REAL NULL,
       f_time TIME NULL,
       f_tinyint TINYINT NULL,
       f_tinyint_unsigned TINYINT UNSIGNED NULL,
       f_json json NULL,
       f_year  year null,
       f_set set('set1', 'set2', 'set3') null
   );



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534299662

   > > Wait for #4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
   > 
   > #4671 is merged.
   
   I'm going to support year first and refine the type
   https://github.com/apache/incubator-seatunnel/issues/4695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1519281793

   merge dev branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534181114

   Wait for https://github.com/apache/incubator-seatunnel/pull/4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173439827


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;

Review Comment:
   Done
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase #4603

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173272683


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));

Review Comment:
   ```suggestion
                           .withLogConsumer(new Slf4jLogConsumer( DockerLoggerFactory.getLogger("mysql-docker-image)));
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")

Review Comment:
   ```suggestion
           disabledReason = "Currently SPARK and FLINK do not support cdc")
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return mySqlContainer;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");

Review Comment:
   ```suggestion
           log.info("Mysql Containers are started");
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return mySqlContainer;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        LOG.info("Mysql ddl execution is complete");
+    }
+
+    @TestTemplate
+    public void test(TestContainer container) throws IOException, InterruptedException {
+        LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+        LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+        LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+        CompletableFuture<Void> executeJobFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                container.executeJob("/mysqlcdc_to_console.conf");
+                            } catch (Exception e) {
+                                LOG.error("Commit task exception :" + e.getMessage());
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            log.info(querySql(SINK_SQL).toString());

Review Comment:
   ```suggestion
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_console.conf:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  execution.checkpoint.interval = 5000
+}
+
+# #   MySQL-CDC {
+#       result_table_name = "fake"
+#       parallelism = 1
+#       server-id = 5656
+#       username = "mysqluser"
+#       password = "mysqlpw"
+#       table-names = ["inventory_vwyw0n.products"]
+#       base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+#     }
+
+source {
+  MySQL-CDC {
+    result_table_name = "customers_mysql_cdc"
+    parallelism = 1
+    server-id = 5652
+    username = "st_user"
+    password = "seatunnel"
+    table-names = ["mysql_cdc.products_cdc"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+  }
+}
+
+transform {
+}
+
+sink {
+jdbc {
+    source_table_name = "customers_mysql_cdc"
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user"
+    password = "seatunnel"
+
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = mysql_cdc
+    table = products_cdc_sink
+    primary_keys = ["id"]
+}

Review Comment:
   ```suggestion
     jdbc {
       source_table_name = "customers_mysql_cdc"
       url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
       driver = "com.mysql.cj.jdbc.Driver"
       user = "st_user"
       password = "seatunnel"
   
       generate_sink_sql = true
       # You need to configure both database and table
       database = mysql_cdc
       table = products_cdc_sink
       primary_keys = ["id"]
     }
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;

Review Comment:
   move to `connector-cdc-mysql-e2e` from `connector-cdc-mysql`



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_console.conf:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  execution.checkpoint.interval = 5000
+}
+
+# #   MySQL-CDC {
+#       result_table_name = "fake"
+#       parallelism = 1
+#       server-id = 5656
+#       username = "mysqluser"
+#       password = "mysqlpw"
+#       table-names = ["inventory_vwyw0n.products"]
+#       base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+#     }

Review Comment:
   ```suggestion
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return mySqlContainer;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        LOG.info("Mysql ddl execution is complete");
+    }
+
+    @TestTemplate
+    public void test(TestContainer container) throws IOException, InterruptedException {
+        LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+        LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+        LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());

Review Comment:
   remove?



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);

Review Comment:
   ```suggestion
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return mySqlContainer;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        LOG.info("Mysql ddl execution is complete");
+    }
+
+    @TestTemplate
+    public void test(TestContainer container) throws IOException, InterruptedException {
+        LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+        LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+        LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+        CompletableFuture<Void> executeJobFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                container.executeJob("/mysqlcdc_to_console.conf");

Review Comment:
   ```suggestion
                                   container.executeJob("/mysqlcdc_to_jdbc.conf");
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_console.conf:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  execution.checkpoint.interval = 5000
+}
+
+# #   MySQL-CDC {
+#       result_table_name = "fake"
+#       parallelism = 1
+#       server-id = 5656
+#       username = "mysqluser"
+#       password = "mysqlpw"
+#       table-names = ["inventory_vwyw0n.products"]
+#       base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+#     }
+
+source {
+  MySQL-CDC {
+    result_table_name = "customers_mysql_cdc"
+    parallelism = 1

Review Comment:
   ```suggestion
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return mySqlContainer;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        LOG.info("Mysql ddl execution is complete");
+    }
+
+    @TestTemplate
+    public void test(TestContainer container) throws IOException, InterruptedException {
+        LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+        LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+        LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+        CompletableFuture<Void> executeJobFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                container.executeJob("/mysqlcdc_to_console.conf");
+                            } catch (Exception e) {
+                                LOG.error("Commit task exception :" + e.getMessage());
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            log.info(querySql(SINK_SQL).toString());
+                            Assertions.assertIterableEquals(
+                                    querySql(SOURCE_SQL), querySql(SINK_SQL));
+                        });
+
+        // insert update delete
+        upsertDeleteSourceTable();
+
+        // stream stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(10, querySql(SINK_SQL).size());
+                        });

Review Comment:
   duplicate check
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173307021


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+    // mysql statement
+    private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+    private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("docker/server-gtids/my.cnf")
+                        .withSetupSQL("docker/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withUsername(MYSQL_USER_NAME)
+                        .withPassword(MYSQL_USER_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return mySqlContainer;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        LOG.info("Mysql ddl execution is complete");
+    }
+
+    @TestTemplate
+    public void test(TestContainer container) throws IOException, InterruptedException {
+        LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+        LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+        LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+        CompletableFuture<Void> executeJobFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                container.executeJob("/mysqlcdc_to_console.conf");

Review Comment:
   rename `mysqlcdc_to_console.conf` to `mysqlcdc_to_jdbc.conf`
   ```suggestion
                                   container.executeJob("/mysqlcdc_to_jdbc.conf");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase #4603

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1517296949

   check ci error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase

Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1519296090

   > merge dev branch
   Have merged help run CI
   @hailin0 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org