You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/07/12 07:47:23 UTC

[GitHub] [incubator-seatunnel] legendtkl commented on a diff in pull request #2166: [feature][Flink SQL CDC] Add "MySQL CDC Source Connector" & Support to set flink execution environment

legendtkl commented on code in PR #2166:
URL: https://github.com/apache/incubator-seatunnel/pull/2166#discussion_r918641341


##########
docs/en/connector/flink-sql/FlinkEnvConfiguration.md:
##########
@@ -0,0 +1,34 @@
+# Flink Env Configuration

Review Comment:
   I don't think this is necessary. Now we can use 'SET parallelism.default = 3' clause to set configuration



##########
seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java:
##########
@@ -57,7 +57,9 @@ public class Executor {
     private static final Logger LOGGER = LoggerFactory.getLogger(Executor.class);
 
     private static final String FLINK_SQL_SET_MATCHING_REGEX = "SET(\\s+(\\S+)\\s*=(.*))?";
-    private static final int FLINK_SQL_SET_OPERANDS = 3;
+    /** env config regex */
+    private static final String FLINK_SQL_CONF_MATCHING_REGEX = "CONF(\\s+(\\S+)\\s*=(.*))?";

Review Comment:
   I thinks we can reuse `FLINK_SQL_SET_MATCHING_REGEX `



##########
docs/en/connector/flink-sql/MySQL-CDC.md:
##########
@@ -0,0 +1,78 @@
+# Flink SQL MySQL CDC Connector
+
+## Description
+
+We can use the Flink Mysql CDC Connector allows for reading snapshot data and incremental data from MySQL database. Refer to the [Flink CDC MySQL CDC Connector](https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/mysql-cdc.html#connector-options) for more information.

Review Comment:
   Flink Mysql CDC Connectors to read 



##########
docs/en/connector/flink-sql/MySQL-CDC.md:
##########
@@ -0,0 +1,78 @@
+# Flink SQL MySQL CDC Connector
+
+## Description
+
+We can use the Flink Mysql CDC Connector allows for reading snapshot data and incremental data from MySQL database. Refer to the [Flink CDC MySQL CDC Connector](https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/mysql-cdc.html#connector-options) for more information.
+
+## Usage
+
+### 1. Flink MySQL CDC Version
+
+flink-sql-connector-mysql-cdc-2.1.1
+
+### 2. prepare data
+
+Start mysql server locally, and create a database named "test" and a table named "test_table" in the database.
+
+The table "test_table" could be created by the following SQL:
+
+```sql
+CREATE TABLE IF NOT EXISTS `test_table`(
+   `id` INT UNSIGNED AUTO_INCREMENT,
+   `class_id` INT,
+   `student_id` INT,
+   `course` VARCHAR(100),
+   PRIMARY KEY ( `id` )
+)ENGINE=InnoDB DEFAULT CHARSET=utf8;
+```
+
+Insert some data into the table "test_table".
+
+### 3. seatunnel config 
+
+Prepare a seatunnel config file with the following content:
+
+```sql
+SET 'table.dml-sync' = 'true';

Review Comment:
   I have test that the single quote character "'" would cause the config not work. Would you help double check this?



##########
docs/en/connector/flink-sql/MySQL-CDC.md:
##########
@@ -0,0 +1,78 @@
+# Flink SQL MySQL CDC Connector
+
+## Description
+
+We can use the Flink Mysql CDC Connector allows for reading snapshot data and incremental data from MySQL database. Refer to the [Flink CDC MySQL CDC Connector](https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/mysql-cdc.html#connector-options) for more information.
+
+## Usage
+
+### 1. Flink MySQL CDC Version
+
+flink-sql-connector-mysql-cdc-2.1.1
+
+### 2. prepare data
+
+Start mysql server locally, and create a database named "test" and a table named "test_table" in the database.
+
+The table "test_table" could be created by the following SQL:
+
+```sql
+CREATE TABLE IF NOT EXISTS `test_table`(
+   `id` INT UNSIGNED AUTO_INCREMENT,
+   `class_id` INT,
+   `student_id` INT,
+   `course` VARCHAR(100),
+   PRIMARY KEY ( `id` )
+)ENGINE=InnoDB DEFAULT CHARSET=utf8;
+```
+
+Insert some data into the table "test_table".
+
+### 3. seatunnel config 
+
+Prepare a seatunnel config file with the following content:
+
+```sql
+SET 'table.dml-sync' = 'true';
+SET 'sql-client.verbose' = 'true';
+SET 'sql-client.execution.result-mode' = 'tableau';
+
+-- set checkpoint interval
+CONF execution.checkpointing.interval = 1min;
+
+CREATE TABLE IF NOT EXISTS test_table(

Review Comment:
   That will be better if the code is formated.



##########
docs/en/connector/flink-sql/MySQL-CDC.md:
##########
@@ -0,0 +1,78 @@
+# Flink SQL MySQL CDC Connector
+
+## Description
+
+We can use the Flink Mysql CDC Connector allows for reading snapshot data and incremental data from MySQL database. Refer to the [Flink CDC MySQL CDC Connector](https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/mysql-cdc.html#connector-options) for more information.
+
+## Usage
+
+### 1. Flink MySQL CDC Version
+
+flink-sql-connector-mysql-cdc-2.1.1
+
+### 2. prepare data
+
+Start mysql server locally, and create a database named "test" and a table named "test_table" in the database.
+
+The table "test_table" could be created by the following SQL:
+
+```sql
+CREATE TABLE IF NOT EXISTS `test_table`(
+   `id` INT UNSIGNED AUTO_INCREMENT,
+   `class_id` INT,
+   `student_id` INT,
+   `course` VARCHAR(100),
+   PRIMARY KEY ( `id` )
+)ENGINE=InnoDB DEFAULT CHARSET=utf8;
+```
+
+Insert some data into the table "test_table".
+
+### 3. seatunnel config 
+
+Prepare a seatunnel config file with the following content:
+
+```sql
+SET 'table.dml-sync' = 'true';
+SET 'sql-client.verbose' = 'true';
+SET 'sql-client.execution.result-mode' = 'tableau';
+
+-- set checkpoint interval
+CONF execution.checkpointing.interval = 1min;

Review Comment:
   Can we reuse `SET`



##########
seatunnel-core/seatunnel-core-flink-sql/pom.xml:
##########
@@ -60,6 +60,16 @@
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
+            <version>${flink.cdc.version}</version>
+        </dependency>
+        <dependency>

Review Comment:
   I am not sure if it is ok to involve flink-runtime-web dependency. Maybe it's good for debug. cc @ruanwenjun 



##########
seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java:
##########
@@ -106,6 +109,11 @@ private static StatementSet handleStatements(String workFlowContent, StreamTable
 
         List<String> stmts = SqlStatementSplitter.normalizeStatements(workFlowContent);
         for (String stmt : stmts) {
+            Optional<Pair<String, String>> confOptional = parseConfOperation(stmt);
+            // skip env config
+            if (confOptional.isPresent()) {
+                continue;
+            }

Review Comment:
   we need to discuss if the conf is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org