You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "zhilinli123 (via GitHub)" <gi...@apache.org> on 2023/04/24 14:41:25 UTC
[GitHub] [incubator-seatunnel] zhilinli123 opened a new pull request, #4639: [Feature][E2E] Add mysql-cdc e2e testcase
zhilinli123 opened a new pull request, #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639
issues:https://github.com/apache/incubator-seatunnel/issues/4603
<!--
Thank you for contributing to SeaTunnel! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
## Contribution Checklist
- Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
- Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
- Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
-->
## Purpose of this pull request
<!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
## Check list
* [ ] Code changed are covered with tests, or it does not need tests for reason:
* [ ] If any new Jar binary package adding in your PR, please add License Notice according
[New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
* [ ] If you are contributing the connector code, please check that the following files are updated:
1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
* [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534298377
> > 等待#4671合并和JDBC MySQL YEAR类型支持,然后添加相应的数据类型测试
>
> #4671 合并。
> > Wait for #4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
>
> #4671 is merged.
I'm going to support year first and refine the type
https://github.com/apache/incubator-seatunnel/issues/4695
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1524950949
> I suggest covering the data types fixed in these two PRs: #4670 TIME #4671 tinyint(1) bit(1) bit(8) bit(32) bit(64)
Can you refer to : https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1186632838
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, f_year from mysql_cdc_e2e_source_table";
+ private static final String SINK_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";
Review Comment:
> sink table is `mysql_cdc_e2e_source_table`?
Sorry for my negligence. I have corrected it. Thank you for your review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1186632838
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, f_year from mysql_cdc_e2e_source_table";
+ private static final String SINK_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";
Review Comment:
> sink table is `mysql_cdc_e2e_source_table`?
Sorry for my negligence. I have corrected it. Thank you for your review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173446059
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml:
##########
@@ -82,24 +82,4 @@
<scope>test</scope>
</dependency>
</dependencies>
-
- <build>
Review Comment:
Or, fix find connector jar package logic to ignore test.jar
org.apache.seatunnel.e2e.common.util.ContainerUtil#copyConnectorJarToContainer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1186595134
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, f_year from mysql_cdc_e2e_source_table";
+ private static final String SINK_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";
Review Comment:
sink table is `mysql_cdc_e2e_source_table`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534246175
> Wait for #4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
#4671 is merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1535968091
@ic4y @hailin0 @EricJoy2048 Please take a look thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1187561597
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, f_year from mysql_cdc_e2e_source_table";
+ private static final String SINK_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";
Review Comment:
@hailin0 Thank you ci review I have changed it. Thank you for your careful review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1187561597
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, f_year from mysql_cdc_e2e_source_table";
+ private static final String SINK_SQL =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, cast(f_year as year) from mysql_cdc_e2e_source_table";
Review Comment:
@hailin0 Thank you ci review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 merged PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1520298798
rerun CI thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 closed pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 closed pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
URL: https://github.com/apache/incubator-seatunnel/pull/4639
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1523198177
LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] ic4y commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1524924163
I suggest covering the data types fixed in these two PRs:
https://github.com/apache/incubator-seatunnel/pull/4670
TIME
https://github.com/apache/incubator-seatunnel/pull/4671
tinyint(1) bit(1) bit(8) bit(32) bit(64)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1177709745
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql:
##########
@@ -0,0 +1,52 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `mysql_cdc`;
+
+use mysql_cdc;
+-- Create a mysql data source table
+CREATE TABLE mysql_cdc.products_cdc (
Review Comment:
create table if not exists test1000004
(
id int auto_increment primary key,
f_binary binary(64) null,
f_blob blob null,
f_long_varbinary mediumblob null,
f_longblob longblob null,
f_tinyblob tinyblob null,
f_varbinary varbinary(100) null,
f_smallint smallint null,
f_smallint_unsigned smallint unsigned null,
f_mediumint mediumint null,
f_mediumint_unsigned mediumint unsigned null,
f_int int null,
f_int_unsigned int unsigned null,
f_integer int null,
f_integer_unsigned int unsigned null,
f_bigint bigint null,
f_bigint_unsigned bigint unsigned null,
f_numeric decimal null,
f_decimal decimal null,
f_float float null,
f_double double null,
f_double_precision double null,
f_longtext longtext null,
f_mediumtext mediumtext null,
f_text text null,
f_tinytext tinytext null,
f_varchar varchar(100) null,
f_date date null,
f_datetime datetime null,
f_timestamp timestamp null,
f_bit1 BIT(1) NULL,
f_bit64 BIT(64) NULL,
f_char CHAR NULL,
f_enum ENUM('enum1', 'enum2', 'enum3') NULL,
f_mediumblob MEDIUMBLOB NULL,
f_long_varchar LONG VARCHAR NULL,
f_real REAL NULL,
f_time TIME NULL,
f_tinyint TINYINT NULL,
f_tinyint_unsigned TINYINT UNSIGNED NULL,
f_json json NULL,
f_year year null,
f_set set('set1', 'set2', 'set3') null
);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534299662
> > Wait for #4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
>
> #4671 is merged.
I'm going to support year first and refine the type
https://github.com/apache/incubator-seatunnel/issues/4695
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1519281793
merge dev branch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] ic4y commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1534181114
Wait for https://github.com/apache/incubator-seatunnel/pull/4671 to be merged and JDBC MySQL YEAR type support, then add corresponding data type tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173439827
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase #4603
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173272683
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
Review Comment:
```suggestion
.withLogConsumer(new Slf4jLogConsumer( DockerLoggerFactory.getLogger("mysql-docker-image)));
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
Review Comment:
```suggestion
disabledReason = "Currently SPARK and FLINK do not support cdc")
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return mySqlContainer;
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
Review Comment:
```suggestion
log.info("Mysql Containers are started");
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return mySqlContainer;
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ LOG.info("Mysql ddl execution is complete");
+ }
+
+ @TestTemplate
+ public void test(TestContainer container) throws IOException, InterruptedException {
+ LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+ LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+ LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+ CompletableFuture<Void> executeJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mysqlcdc_to_console.conf");
+ } catch (Exception e) {
+ LOG.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ log.info(querySql(SINK_SQL).toString());
Review Comment:
```suggestion
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_console.conf:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+# # MySQL-CDC {
+# result_table_name = "fake"
+# parallelism = 1
+# server-id = 5656
+# username = "mysqluser"
+# password = "mysqlpw"
+# table-names = ["inventory_vwyw0n.products"]
+# base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+# }
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ parallelism = 1
+ server-id = 5652
+ username = "st_user"
+ password = "seatunnel"
+ table-names = ["mysql_cdc.products_cdc"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ }
+}
+
+transform {
+}
+
+sink {
+jdbc {
+ source_table_name = "customers_mysql_cdc"
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user"
+ password = "seatunnel"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = mysql_cdc
+ table = products_cdc_sink
+ primary_keys = ["id"]
+}
Review Comment:
```suggestion
jdbc {
source_table_name = "customers_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"
generate_sink_sql = true
# You need to configure both database and table
database = mysql_cdc
table = products_cdc_sink
primary_keys = ["id"]
}
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
Review Comment:
move to `connector-cdc-mysql-e2e` from `connector-cdc-mysql`
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_console.conf:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+# # MySQL-CDC {
+# result_table_name = "fake"
+# parallelism = 1
+# server-id = 5656
+# username = "mysqluser"
+# password = "mysqlpw"
+# table-names = ["inventory_vwyw0n.products"]
+# base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+# }
Review Comment:
```suggestion
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return mySqlContainer;
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ LOG.info("Mysql ddl execution is complete");
+ }
+
+ @TestTemplate
+ public void test(TestContainer container) throws IOException, InterruptedException {
+ LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+ LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+ LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
Review Comment:
remove?
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
Review Comment:
```suggestion
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return mySqlContainer;
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ LOG.info("Mysql ddl execution is complete");
+ }
+
+ @TestTemplate
+ public void test(TestContainer container) throws IOException, InterruptedException {
+ LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+ LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+ LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+ CompletableFuture<Void> executeJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mysqlcdc_to_console.conf");
Review Comment:
```suggestion
container.executeJob("/mysqlcdc_to_jdbc.conf");
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_console.conf:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+# # MySQL-CDC {
+# result_table_name = "fake"
+# parallelism = 1
+# server-id = 5656
+# username = "mysqluser"
+# password = "mysqlpw"
+# table-names = ["inventory_vwyw0n.products"]
+# base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+# }
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ parallelism = 1
Review Comment:
```suggestion
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return mySqlContainer;
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ LOG.info("Mysql ddl execution is complete");
+ }
+
+ @TestTemplate
+ public void test(TestContainer container) throws IOException, InterruptedException {
+ LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+ LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+ LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+ CompletableFuture<Void> executeJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mysqlcdc_to_console.conf");
+ } catch (Exception e) {
+ LOG.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ log.info(querySql(SINK_SQL).toString());
+ Assertions.assertIterableEquals(
+ querySql(SOURCE_SQL), querySql(SINK_SQL));
+ });
+
+ // insert update delete
+ upsertDeleteSourceTable();
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(10, querySql(SINK_SQL).size());
+ });
Review Comment:
duplicate check
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#discussion_r1173307021
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "")
+public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCIT.class);
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
+
+ // mysql statement
+ private static final String SOURCE_SQL = "select * from mysql_cdc.products_cdc";
+ private static final String SINK_SQL = "select * from mysql_cdc.products_cdc_sink";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return mySqlContainer;
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ LOG.info("Mysql ddl execution is complete");
+ }
+
+ @TestTemplate
+ public void test(TestContainer container) throws IOException, InterruptedException {
+ LOG.info("-------docker mysql host:{}", MYSQL_CONTAINER.getHost());
+ LOG.info("-------docker mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+ LOG.info("-------docker mysql database:{}", MYSQL_CONTAINER.getDatabaseName());
+
+ CompletableFuture<Void> executeJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mysqlcdc_to_console.conf");
Review Comment:
rename `mysqlcdc_to_console.conf` to `mysqlcdc_to_jdbc.conf`
```suggestion
container.executeJob("/mysqlcdc_to_jdbc.conf");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase #4603
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1517296949
check ci error
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] zhilinli123 commented on pull request #4639: [Feature][E2E] Add mysql-cdc e2e testcase
Posted by "zhilinli123 (via GitHub)" <gi...@apache.org>.
zhilinli123 commented on PR #4639:
URL: https://github.com/apache/incubator-seatunnel/pull/4639#issuecomment-1519296090
> merge dev branch
Have merged help run CI
@hailin0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org