You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/06/05 03:08:21 UTC
[doris-flink-connector] branch master updated: [Feature]add mysql database sync (#141)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9a9905b [Feature]add mysql database sync (#141)
9a9905b is described below
commit 9a9905bbaad25f982f9b08c370ba45118299aa57
Author: wudi <67...@qq.com>
AuthorDate: Mon Jun 5 11:08:15 2023 +0800
[Feature]add mysql database sync (#141)
---
flink-doris-connector/build.sh | 9 +-
flink-doris-connector/pom.xml | 32 ++-
.../apache/doris/flink/catalog/DorisCatalog.java | 3 +
.../doris/flink/catalog/doris/DataModel.java | 23 +++
.../doris/flink/catalog/doris/DorisSystem.java | 230 +++++++++++++++++++++
.../doris/flink/catalog/doris/DorisType.java | 42 ++++
.../doris/flink/catalog/doris/FieldSchema.java | 56 +++++
.../doris/flink/catalog/doris/TableSchema.java | 97 +++++++++
.../doris/flink/cfg/DorisExecutionOptions.java | 2 +-
.../flink/exception/CreateTableException.java | 45 ++++
.../doris/flink/sink/writer/DorisWriter.java | 11 +-
.../doris/flink/table/DorisConfigOptions.java | 14 ++
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 101 +++++++++
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 217 +++++++++++++++++++
.../flink/tools/cdc}/DateToStringConverter.java | 10 +-
.../apache/doris/flink/tools/cdc/SourceSchema.java | 104 ++++++++++
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 192 +++++++++++++++++
.../doris/flink/tools/cdc/mysql/MysqlType.java | 168 +++++++++++++++
.../tools/cdc/mysql/ParsingProcessFunction.java | 65 ++++++
.../apache/doris/flink/CDCSchemaChangeExample.java | 5 +-
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 73 +++++++
.../doris/flink/utils/DateToStringConverter.java | 2 +-
22 files changed, 1466 insertions(+), 35 deletions(-)
diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh
index 9dbe69d..5816e86 100755
--- a/flink-doris-connector/build.sh
+++ b/flink-doris-connector/build.sh
@@ -147,10 +147,15 @@ elif [ ${flinkVer} -eq 3 ]; then
FLINK_VERSION="1.17.0"
fi
-echo_g " flink version: ${FLINK_VERSION}"
+# extract minor version:
+# eg: 3.1.2 -> 3
+FLINK_MINOR_VERSION=0
+[ ${FLINK_VERSION} != 0 ] && FLINK_MINOR_VERSION=${FLINK_VERSION%.*}
+
+echo_g " flink version: ${FLINK_VERSION}, minor version: ${FLINK_MINOR_VERSION}"
echo_g " build starting..."
-${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} "$@"
+${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.minor.version=${FLINK_MINOR_VERSION} "$@"
EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 0d0e6c4..025bcde 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -26,7 +26,7 @@ under the License.
<version>23</version>
</parent>
<groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector</artifactId>
+ <artifactId>flink-doris-connector-${flink.minor.version}</artifactId>
<version>1.4.0-SNAPSHOT</version>
<name>Flink Doris Connector</name>
<url>https://doris.apache.org/</url>
@@ -68,6 +68,7 @@ under the License.
<properties>
<flink.version>1.15.0</flink.version>
+ <flink.minor.version>1.15</flink.minor.version>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
@@ -237,14 +238,20 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
-
+ <!-- use cdc bundled jar for kafka connect class-->
<dependency>
<groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
+ <artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
@@ -393,19 +400,6 @@ under the License.
</build>
<profiles>
-
- <profile>
- <id>thirdparty</id>
- <activation>
- <property>
- <name>env.DORIS_THIRDPARTY</name>
- </property>
- </activation>
- <properties>
- <doris.thirdparty>${env.DORIS_THIRDPARTY}</doris.thirdparty>
- </properties>
- </profile>
-
<!-- for custom internal repository -->
<profile>
<id>custom-env</id>
@@ -478,4 +472,4 @@ under the License.
</profile>
</profiles>
-</project>
\ No newline at end of file
+</project>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 3b5834f..ef735e5 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -81,6 +81,9 @@ import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
+/**
+ * catalog for flink
+ */
public class DorisCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
new file mode 100644
index 0000000..03b87a0
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+public enum DataModel {
+ DUPLICATE,
+ UNIQUE,
+ AGGREGATE
+}
\ No newline at end of file
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
new file mode 100644
index 0000000..36aba1c
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.apache.doris.flink.connection.JdbcConnectionProvider;
+import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
+import org.apache.doris.flink.exception.CreateTableException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Doris System Operate
+ */
+@Public
+public class DorisSystem {
+ private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
+ private JdbcConnectionProvider jdbcConnectionProvider;
+ private static final List<String> builtinDatabases = Arrays.asList("information_schema");
+
+ public DorisSystem(DorisConnectionOptions options) {
+ this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
+ }
+
+ public List<String> listDatabases() throws Exception {
+ return extractColumnValuesBySQL(
+ "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
+ 1,
+ dbName -> !builtinDatabases.contains(dbName));
+ }
+
+ public boolean databaseExists(String database) throws Exception {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
+ return listDatabases().contains(database);
+ }
+
+ public boolean createDatabase(String database) throws Exception {
+ execute(String.format("CREATE DATABASE %s", database));
+ return true;
+ }
+
+ public boolean tableExists(String database, String table){
+ try {
+ return databaseExists(database)
+ && listTables(database).contains(table);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public List<String> listTables(String databaseName) throws Exception {
+ if (!databaseExists(databaseName)) {
+ throw new DorisRuntimeException("database" + databaseName + " is not exists");
+ }
+ return extractColumnValuesBySQL(
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ public void createTable(TableSchema schema) throws Exception {
+ String ddl = buildCreateTableDDL(schema);
+ LOG.info("Create table with ddl:{}", ddl);
+ execute(ddl);
+ }
+
+ public void execute(String sql) throws Exception {
+ Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
+ try (Statement statement = conn.createStatement()) {
+ statement.execute(sql);
+ }
+ }
+
+ private List<String> extractColumnValuesBySQL(
+ String sql,
+ int columnIndex,
+ Predicate<String> filterFunc,
+ Object... params) throws Exception {
+
+ Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
+ List<String> columnValues = Lists.newArrayList();
+
+ try (PreparedStatement ps = conn.prepareStatement(sql)) {
+ if (Objects.nonNull(params) && params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ ps.setObject(i + 1, params[i]);
+ }
+ }
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
+ columnValues.add(columnValue);
+ }
+ }
+ return columnValues;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "The following SQL query could not be executed: %s", sql),
+ e);
+ }
+ }
+
+ public String buildCreateTableDDL(TableSchema schema) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
+ sb.append(identifier(schema.getDatabase()))
+ .append(".")
+ .append(identifier(schema.getTable()))
+ .append("(");
+
+ Map<String, FieldSchema> fields = schema.getFields();
+ List<String> keys = schema.getKeys();
+ //append keys
+ for(String key : keys){
+ if(!fields.containsKey(key)){
+ throw new CreateTableException("key " + key + " not found in column list");
+ }
+ FieldSchema field = fields.get(key);
+ buildColumn(sb, field);
+ }
+
+ //append values
+ for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ if(keys.contains(entry.getKey())){
+ continue;
+ }
+ FieldSchema field = entry.getValue();
+ buildColumn(sb, field);
+
+ }
+ sb = sb.deleteCharAt(sb.length() -1);
+ sb.append(" ) ");
+ //append model
+ sb.append(schema.getModel().name())
+ .append(" KEY(")
+ .append(String.join(",", identifier(schema.getKeys())))
+ .append(")");
+
+ //append table comment
+ if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){
+ sb.append(" COMMENT '")
+ .append(schema.getTableComment())
+ .append("' ");
+ }
+
+ //append distribute key
+ sb.append(" DISTRIBUTED BY HASH(")
+ .append(String.join(",", identifier(schema.getDistributeKeys())))
+ .append(") BUCKETS AUTO ");
+
+ //append properties
+ int index = 0;
+ for (Map.Entry<String, String> entry : schema.getProperties().entrySet()) {
+ if (index == 0) {
+ sb.append(" PROPERTIES (");
+ }
+ if (index > 0) {
+ sb.append(",");
+ }
+ sb.append(quoteProperties(entry.getKey()))
+ .append("=")
+ .append(quoteProperties(entry.getValue()));
+ index++;
+
+ if (index == schema.getProperties().size()) {
+ sb.append(")");
+ }
+ }
+ return sb.toString();
+ }
+
+ private void buildColumn(StringBuilder sql, FieldSchema field){
+ sql.append(identifier(field.getName()))
+ .append(" ")
+ .append(field.getTypeString())
+ .append(" COMMENT '")
+ .append(field.getComment())
+ .append("',");
+ }
+
+ private List<String> identifier(List<String> name) {
+ List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
+ return result;
+ }
+
+ private String identifier(String name) {
+ return "`" + name + "`";
+ }
+
+ private String quoteProperties(String name) {
+ return "'" + name + "'";
+ }
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
new file mode 100644
index 0000000..1e43ac1
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+public class DorisType {
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String LARGEINT = "LARGEINT";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DECIMAL_V3 = "DECIMALV3";
+ public static final String DATE = "DATE";
+ public static final String DATE_V2 = "DATEV2";
+ public static final String DATETIME = "DATETIME";
+ public static final String DATETIME_V2 = "DATETIMEV2";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String HLL = "HLL";
+ public static final String BITMAP = "BITMAP";
+ public static final String ARRAY = "ARRAY";
+ public static final String JSONB = "JSONB";
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
new file mode 100644
index 0000000..8255bd3
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+public class FieldSchema {
+ private String name;
+ private String typeString;
+ private String comment;
+
+ public FieldSchema() {
+ }
+
+ public FieldSchema(String name, String typeString, String comment) {
+ this.name = name;
+ this.typeString = typeString;
+ this.comment = comment;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getTypeString() {
+ return typeString;
+ }
+
+ public void setTypeString(String typeString) {
+ this.typeString = typeString;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public void setComment(String comment) {
+ this.comment = comment;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
new file mode 100644
index 0000000..8f04705
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TableSchema {
+ private String database;
+ private String table;
+ private String tableComment;
+ private Map<String, FieldSchema> fields;
+ private List<String> keys = new ArrayList<>();
+ private DataModel model = DataModel.DUPLICATE;
+ private List<String> distributeKeys = new ArrayList<>();
+ private Map<String, String> properties = new HashMap<>();
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getTableComment() {
+ return tableComment;
+ }
+
+ public Map<String, FieldSchema> getFields() {
+ return fields;
+ }
+
+ public List<String> getKeys() {
+ return keys;
+ }
+
+ public DataModel getModel() {
+ return model;
+ }
+
+ public List<String> getDistributeKeys() {
+ return distributeKeys;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public void setTableComment(String tableComment) {
+ this.tableComment = tableComment;
+ }
+
+ public void setFields(Map<String, FieldSchema> fields) {
+ this.fields = fields;
+ }
+
+ public void setKeys(List<String> keys) {
+ this.keys = keys;
+ }
+
+ public void setModel(DataModel model) {
+ this.model = model;
+ }
+
+ public void setDistributeKeys(List<String> distributeKeys) {
+ this.distributeKeys = distributeKeys;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 36c577a..722f6ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -122,7 +122,7 @@ public class DorisExecutionOptions implements Serializable {
private int bufferCount = DEFAULT_BUFFER_COUNT;
private String labelPrefix = "";
private Properties streamLoadProp = new Properties();
- private boolean enableDelete = false;
+ private boolean enableDelete = true;
private boolean enable2PC = true;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java
new file mode 100644
index 0000000..929346c
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.exception;
+
+/**
+ * Create Table exception.
+ */
+public class CreateTableException extends RuntimeException {
+ public CreateTableException() {
+ super();
+ }
+
+ public CreateTableException(String message) {
+ super(message);
+ }
+
+ public CreateTableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CreateTableException(Throwable cause) {
+ super(cause);
+ }
+
+ protected CreateTableException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 711f765..642e1d3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -130,15 +130,16 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
+ byte[] serialize = serializer.serialize(in);
+ if(Objects.isNull(serialize)){
+ //ddl record
+ return;
+ }
if(!loading) {
//Start streamload only when there has data
dorisStreamLoad.startLoad(currentLabel);
loading = true;
}
- byte[] serialize = serializer.serialize(in);
- if(Objects.isNull(serialize)){
- return;
- }
dorisStreamLoad.writeRecord(serialize);
}
@@ -254,7 +255,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
backend = "http://" + backend;
URL url = new URL(backend);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(1000);
+ co.setConnectTimeout(60000);
co.connect();
co.disconnect();
return true;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index a637967..3e129aa 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -22,6 +22,8 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
@@ -196,4 +198,16 @@ public class DorisConfigOptions {
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+ public static Properties getStreamLoadProp(Map<String, String> tableOptions) {
+ final Properties streamLoadProp = new Properties();
+
+ for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+ if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
+ String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
+ streamLoadProp.put(subKey, entry.getValue());
+ }
+ }
+ return streamLoadProp;
+ }
+
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
new file mode 100644
index 0000000..809c4ea
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * cdc sync tools
+ */
+public class CdcTools {
+ private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
+ private static final List<String> EMPTY_KEYS = Arrays.asList("password");
+
+ public static void main(String[] args) throws Exception {
+ String operation = args[0].toLowerCase();
+ String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
+ System.out.println();
+ switch (operation) {
+ case MYSQL_SYNC_DATABASE:
+ createMySQLSyncDatabase(opArgs);
+ break;
+ default:
+ System.out.println("Unknown operation " + operation);
+ System.exit(1);
+ }
+ }
+
+ private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ String jobName = params.get("job-name");
+ String database = params.get("database");
+ String tablePrefix = params.get("table-prefix");
+ String tableSuffix = params.get("table-suffix");
+ String includingTables = params.get("including-tables");
+ String excludingTables = params.get("excluding-tables");
+
+ Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
+ Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
+ Map<String, String> tableMap = getConfigMap(params, "table-conf");
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
+ Configuration sinkConfig = Configuration.fromMap(sinkMap);
+
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap);
+ databaseSync.build();
+
+ if(StringUtils.isNullOrWhitespaceOnly(jobName)){
+ jobName = String.format("MySQL-Doris Sync Database: %s", mysqlMap.get("database-name"));
+ }
+ env.execute(jobName);
+ }
+
+ private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
+ if (!params.has(key)) {
+ return null;
+ }
+
+ Map<String, String> map = new HashMap<>();
+ for (String param : params.getMultiParameter(key)) {
+ String[] kv = param.split("=");
+ if (kv.length == 2) {
+ map.put(kv[0], kv[1]);
+ continue;
+ }else if(kv.length == 1 && EMPTY_KEYS.contains(kv[0])){
+ map.put(kv[0], "");
+ continue;
+ }
+
+ System.err.println(
+ "Invalid " + key + " " + param + ".\n");
+ return null;
+ }
+ return map;
+ }
+}
\ No newline at end of file
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
new file mode 100644
index 0000000..84d5b57
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.table.DorisConfigOptions;
+import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+public abstract class DatabaseSync {
+ private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
+ private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+ protected Configuration config;
+ protected String database;
+ protected TableNameConverter converter;
+ protected Pattern includingPattern;
+ protected Pattern excludingPattern;
+ protected Map<String, String> tableConfig;
+ protected Configuration sinkConfig;
+ public StreamExecutionEnvironment env;
+
+ public abstract Connection getConnection() throws SQLException;
+
+ public abstract List<SourceSchema> getSchemaList() throws Exception;
+
+ public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env);
+
+
+ public void create(StreamExecutionEnvironment env, String database, Configuration config,
+ String tablePrefix, String tableSuffix, String includingTables,
+ String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig) {
+ this.env = env;
+ this.config = config;
+ this.database = database;
+ this.converter = new TableNameConverter(tablePrefix, tableSuffix);
+ this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
+ this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
+ this.sinkConfig = sinkConfig;
+ this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
+ //default enable light schema change
+ if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
+ this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
+ }
+ }
+
+ public void build() throws Exception {
+ DorisConnectionOptions options = getDorisConnectionOptions();
+ DorisSystem dorisSystem = new DorisSystem(options);
+
+ List<SourceSchema> schemaList = getSchemaList();
+ if (!dorisSystem.databaseExists(database)) {
+ LOG.info("database {} not exist, created", database);
+ dorisSystem.createDatabase(database);
+ }
+
+ List<String> syncTables = new ArrayList<>();
+ List<String> dorisTables = new ArrayList<>();
+ for (SourceSchema schema : schemaList) {
+ syncTables.add(schema.getTableName());
+ String dorisTable = converter.convert(schema.getTableName());
+ if (!dorisSystem.tableExists(database, dorisTable)) {
+ TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
+ //set doris target database
+ dorisSchema.setDatabase(database);
+ dorisSchema.setTable(dorisTable);
+ dorisSystem.createTable(dorisSchema);
+ }
+ dorisTables.add(dorisTable);
+ }
+ Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized.");
+ config.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", syncTables) + ")");
+
+ DataStreamSource<String> streamSource = buildCdcSource(env);
+ SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter));
+ for (String table : dorisTables) {
+ OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
+ DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
+
+ int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
+ sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table);
+ }
+ }
+
+ private DorisConnectionOptions getDorisConnectionOptions() {
+ String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+ String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
+ String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+ String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
+ Preconditions.checkNotNull(fenodes, "fenodes is empty in sink-conf");
+ Preconditions.checkNotNull(user, "username is empty in sink-conf");
+ Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
+ DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new DorisConnectionOptions.DorisConnectionOptionsBuilder()
+ .withFenodes(fenodes)
+ .withUsername(user)
+ .withPassword(passwd)
+ .withJdbcUrl(jdbcUrl);
+ return builder.build();
+ }
+
+ /**
+ * create doris sink
+ */
+ public DorisSink<String> buildDorisSink(String table) {
+ String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+ String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
+ String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+ String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
+
+ DorisSink.Builder<String> builder = DorisSink.builder();
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes(fenodes)
+ .setTableIdentifier(database + "." + table)
+ .setUsername(user)
+ .setPassword(passwd);
+
+ Properties pro = new Properties();
+ //default json data format
+ pro.setProperty("format", "json");
+ pro.setProperty("read_json_by_line", "true");
+ //customer stream load properties
+ Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
+ pro.putAll(streamLoadProp);
+ DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder()
+ .setLabelPrefix(String.join("-", labelPrefix, database, table))
+ .setStreamLoadProp(pro);
+
+ sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
+ sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
+ sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
+ sinkConfig.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL).ifPresent(executionBuilder::setCheckInterval);
+ sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries);
+
+ boolean enable2pc = sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC);
+ if(!enable2pc){
+ executionBuilder.disable2PC();
+ }
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
+ .setDorisOptions(dorisBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Filter table that need to be synchronized
+ */
+ protected boolean isSyncNeeded(String tableName) {
+ boolean sync = true;
+ if (includingPattern != null) {
+ sync = includingPattern.matcher(tableName).matches();
+ }
+ if (excludingPattern != null) {
+ sync = sync && !excludingPattern.matcher(tableName).matches();
+ }
+ LOG.debug("table {} is synchronized? {}", tableName, sync);
+ return sync;
+ }
+
+ public static class TableNameConverter implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String prefix;
+ private final String suffix;
+
+ TableNameConverter(){
+ this("","");
+ }
+
+ TableNameConverter(String prefix, String suffix) {
+ this.prefix = prefix == null ? "" : prefix;
+ this.suffix = suffix == null ? "" : suffix;
+ }
+
+ public String convert(String tableName) {
+ return prefix + tableName + suffix;
+ }
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java
similarity index 96%
copy from flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
copy to flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java
index 9d73f53..ed5b2b6 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.utils;
+package org.apache.doris.flink.tools.cdc;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
-import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,11 +47,11 @@ public class DateToStringConverter implements CustomConverter<SchemaBuilder, Rel
static {
DEFAULT_PROPS.setProperty("converters", "date");
- DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.utils.DateToStringConverter");
+ DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.tools.cdc.DateToStringConverter");
DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss");
- DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss");
- DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC");
+ DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS");
+ DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC+8");
}
@Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
new file mode 100644
index 0000000..03aa90e
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SourceSchema {
+ private final String databaseName;
+ private final String tableName;
+ private final String tableComment;
+ private final LinkedHashMap<String, FieldSchema> fields;
+ public final List<String> primaryKeys;
+
+ public SourceSchema(
+ DatabaseMetaData metaData, String databaseName, String tableName, String tableComment)
+ throws Exception {
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.tableComment = tableComment;
+
+ fields = new LinkedHashMap<>();
+ try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ String comment = rs.getString("REMARKS");
+ String fieldType = rs.getString("TYPE_NAME");
+ Integer precision = rs.getInt("COLUMN_SIZE");
+
+ if (rs.wasNull()) {
+ precision = null;
+ }
+ Integer scale = rs.getInt("DECIMAL_DIGITS");
+ if (rs.wasNull()) {
+ scale = null;
+ }
+ String dorisTypeStr = MysqlType.toDorisType(fieldType, precision, scale);
+ fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
+ }
+ }
+
+ primaryKeys = new ArrayList<>();
+ try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ primaryKeys.add(fieldName);
+ }
+ }
+ }
+
+ public TableSchema convertTableSchema(Map<String, String> tableProps) {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setModel(DataModel.UNIQUE);
+ tableSchema.setFields(this.fields);
+ tableSchema.setKeys(this.primaryKeys);
+ tableSchema.setTableComment(this.tableComment);
+ tableSchema.setDistributeKeys(this.primaryKeys);
+ tableSchema.setProperties(tableProps);
+ return tableSchema;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public LinkedHashMap<String, FieldSchema> getFields() {
+ return fields;
+ }
+
+ public List<String> getPrimaryKeys() {
+ return primaryKeys;
+ }
+
+ public String getTableComment() {
+ return tableComment;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
new file mode 100644
index 0000000..caa975d
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.DateToStringConverter;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class MysqlDatabaseSync extends DatabaseSync {
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class);
+
+ public MysqlDatabaseSync() {
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(
+ String.format(
+ "jdbc:mysql://%s:%d?useInformationSchema=true",
+ config.get(MySqlSourceOptions.HOSTNAME),
+ config.get(MySqlSourceOptions.PORT)),
+ config.get(MySqlSourceOptions.USERNAME),
+ config.get(MySqlSourceOptions.PASSWORD));
+ }
+
+ @Override
+ public List<SourceSchema> getSchemaList() throws Exception {
+ String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+ List<SourceSchema> schemaList = new ArrayList<>();
+ try (Connection conn = getConnection()) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet tables =
+ metaData.getTables(databaseName, null, "%", new String[]{"TABLE"})) {
+ while (tables.next()) {
+ String tableName = tables.getString("TABLE_NAME");
+ String tableComment = tables.getString("REMARKS");
+ if (!isSyncNeeded(tableName)) {
+ continue;
+ }
+ SourceSchema sourceSchema =
+ new SourceSchema(metaData, databaseName, tableName, tableComment);
+ if (sourceSchema.primaryKeys.size() > 0) {
+ //Only sync tables with primary keys
+ schemaList.add(sourceSchema);
+ } else {
+ LOG.warn("table {} has no primary key, skip", tableName);
+ System.out.println("table " + tableName + " has no primary key, skip.");
+ }
+ }
+ }
+ }
+ return schemaList;
+ }
+
+ @Override
+ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+ MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+
+ String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+ Preconditions.checkNotNull(databaseName, "database-name in mysql is required");
+ String tableName = config.get(MySqlSourceOptions.TABLE_NAME);
+ sourceBuilder
+ .hostname(config.get(MySqlSourceOptions.HOSTNAME))
+ .port(config.get(MySqlSourceOptions.PORT))
+ .username(config.get(MySqlSourceOptions.USERNAME))
+ .password(config.get(MySqlSourceOptions.PASSWORD))
+ .databaseList(databaseName)
+ .tableList(databaseName + "." + tableName);
+
+ config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
+ config
+ .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
+ .ifPresent(sourceBuilder::serverTimeZone);
+ config
+ .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
+ .ifPresent(sourceBuilder::fetchSize);
+ config
+ .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
+ .ifPresent(sourceBuilder::connectTimeout);
+ config
+ .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
+ .ifPresent(sourceBuilder::connectMaxRetries);
+ config
+ .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
+ .ifPresent(sourceBuilder::connectionPoolSize);
+ config
+ .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
+ .ifPresent(sourceBuilder::heartbeatInterval);
+ config
+ .getOptional(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED)
+ .ifPresent(sourceBuilder::scanNewlyAddedTableEnabled);
+
+ String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
+ if ("initial".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(StartupOptions.initial());
+ } else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(StartupOptions.earliest());
+ } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(StartupOptions.latest());
+ } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
+ BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+ String file = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ Long pos = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ if (file != null && pos != null) {
+ offsetBuilder.setBinlogFilePosition(file, pos);
+ }
+ config
+ .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+ .ifPresent(offsetBuilder::setGtidSet);
+ config
+ .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+ .ifPresent(offsetBuilder::setSkipEvents);
+ config
+ .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+ .ifPresent(offsetBuilder::setSkipRows);
+ sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
+ } else if ("timestamp".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(
+ StartupOptions.timestamp(
+ config.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+ }
+
+ Properties jdbcProperties = new Properties();
+ Properties debeziumProperties = new Properties();
+ //date to string
+ debeziumProperties.putAll(DateToStringConverter.DEFAULT_PROPS);
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+ jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+ } else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+ debeziumProperties.put(
+ key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+ }
+ }
+ sourceBuilder.jdbcProperties(jdbcProperties);
+ sourceBuilder.debeziumProperties(debeziumProperties);
+
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
+ JsonDebeziumDeserializationSchema schema =
+ new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+ MySqlSource<String> mySqlSource = sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
+
+ DataStreamSource<String> streamSource = env.fromSource(
+ mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+ return streamSource;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
new file mode 100644
index 0000000..92325ac
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.flink.util.Preconditions;
+
+public class MysqlType {
+ private static final String BIT = "BIT";
+ private static final String BOOLEAN = "BOOLEAN";
+ private static final String BOOL = "BOOL";
+ private static final String TINYINT = "TINYINT";
+ private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL";
+ private static final String SMALLINT = "SMALLINT";
+ private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL";
+ private static final String MEDIUMINT = "MEDIUMINT";
+ private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL";
+ private static final String INT = "INT";
+ private static final String INT_UNSIGNED = "INT UNSIGNED";
+ private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL";
+ private static final String BIGINT = "BIGINT";
+ private static final String SERIAL = "SERIAL";
+ private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL";
+ private static final String REAL = "REAL";
+ private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+ private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL";
+ private static final String FLOAT = "FLOAT";
+ private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL";
+ private static final String DOUBLE = "DOUBLE";
+ private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+ private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL";
+ private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED";
+ private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
+ "DOUBLE PRECISION UNSIGNED ZEROFILL";
+ private static final String NUMERIC = "NUMERIC";
+ private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+ private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL";
+ private static final String FIXED = "FIXED";
+ private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+ private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL";
+ private static final String DECIMAL = "DECIMAL";
+ private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL";
+ private static final String CHAR = "CHAR";
+ private static final String VARCHAR = "VARCHAR";
+ private static final String TINYTEXT = "TINYTEXT";
+ private static final String MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String TEXT = "TEXT";
+ private static final String LONGTEXT = "LONGTEXT";
+ private static final String DATE = "DATE";
+ private static final String TIME = "TIME";
+ private static final String DATETIME = "DATETIME";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String YEAR = "YEAR";
+ private static final String BINARY = "BINARY";
+ private static final String VARBINARY = "VARBINARY";
+ private static final String TINYBLOB = "TINYBLOB";
+ private static final String MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String BLOB = "BLOB";
+ private static final String LONGBLOB = "LONGBLOB";
+ private static final String JSON = "JSON";
+ private static final String ENUM = "ENUM";
+
+ public static String toDorisType(String type, Integer length, Integer scale) {
+ switch (type.toUpperCase()) {
+ case BIT:
+ case BOOLEAN:
+ case BOOL:
+ return DorisType.BOOLEAN;
+ case TINYINT:
+ return DorisType.TINYINT;
+ case TINYINT_UNSIGNED:
+ case TINYINT_UNSIGNED_ZEROFILL:
+ case SMALLINT:
+ return DorisType.SMALLINT;
+ case SMALLINT_UNSIGNED:
+ case SMALLINT_UNSIGNED_ZEROFILL:
+ case INT:
+ case MEDIUMINT:
+ case YEAR:
+ return DorisType.INT;
+ case INT_UNSIGNED:
+ case INT_UNSIGNED_ZEROFILL:
+ case MEDIUMINT_UNSIGNED:
+ case MEDIUMINT_UNSIGNED_ZEROFILL:
+ case BIGINT:
+ return DorisType.BIGINT;
+ case BIGINT_UNSIGNED:
+ case BIGINT_UNSIGNED_ZEROFILL:
+ return DorisType.LARGEINT;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ case FLOAT_UNSIGNED_ZEROFILL:
+ return DorisType.FLOAT;
+ case REAL:
+ case REAL_UNSIGNED:
+ case REAL_UNSIGNED_ZEROFILL:
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ case DOUBLE_UNSIGNED_ZEROFILL:
+ case DOUBLE_PRECISION:
+ case DOUBLE_PRECISION_UNSIGNED:
+ case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+ return DorisType.DOUBLE;
+ case NUMERIC:
+ case NUMERIC_UNSIGNED:
+ case NUMERIC_UNSIGNED_ZEROFILL:
+ case FIXED:
+ case FIXED_UNSIGNED:
+ case FIXED_UNSIGNED_ZEROFILL:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ case DECIMAL_UNSIGNED_ZEROFILL:
+ return length != null && length <= 38
+ ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, length, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case DATE:
+ return DorisType.DATE_V2;
+ case DATETIME:
+ case TIMESTAMP:
+ return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length == null ? 0 : length, 6));
+ case CHAR:
+ Preconditions.checkNotNull(length);
+ return String.format("%s(%s)", DorisType.CHAR, length);
+ case VARCHAR:
+ Preconditions.checkNotNull(length);
+ return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3);
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case ENUM:
+ case TIME:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case BINARY:
+ case VARBINARY:
+ return DorisType.STRING;
+ case JSON:
+ return DorisType.JSONB;
+ default:
+ throw new UnsupportedOperationException("Unsupported MySQL Type: " + type);
+ }
+
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
new file mode 100644
index 0000000..563c848
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParsingProcessFunction extends ProcessFunction<String, Void> {
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private transient Map<String, OutputTag<String>> recordOutputTags;
+ private DatabaseSync.TableNameConverter converter;
+
+ public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
+ this.converter = converter;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ recordOutputTags = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector) throws Exception {
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+ String tableName = extractJsonNode(recordRoot.get("source"), "table");
+ String dorisName = converter.convert(tableName);
+ context.output(getRecordOutputTag(dorisName), record);
+ }
+
+ private String extractJsonNode(JsonNode record, String key) {
+ return record != null && record.get(key) != null ? record.get(key).asText() : null;
+ }
+
+ private OutputTag<String> getRecordOutputTag(String tableName) {
+ return recordOutputTags.computeIfAbsent(
+ tableName, ParsingProcessFunction::createRecordOutputTag);
+ }
+
+ public static OutputTag<String> createRecordOutputTag(String tableName) {
+ return new OutputTag<String>("record-" + tableName) {
+ };
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index bdc0584..3bad9be 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -18,16 +18,17 @@
package org.apache.doris.flink;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.utils.DateToStringConverter;
import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.utils.DateToStringConverter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.kafka.connect.json.JsonConverterConfig;
import java.util.HashMap;
import java.util.Map;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
new file mode 100644
index 0000000..cbd3b38
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcMysqlSyncDatabaseCase {
+
+ public static void main(String[] args) throws Exception{
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+// env.setParallelism(1);
+
+ Map<String,String> flinkMap = new HashMap<>();
+ flinkMap.put("execution.checkpointing.interval","10s");
+ flinkMap.put("pipeline.operator-chaining","false");
+ flinkMap.put("parallelism.default","1");
+
+
+ Configuration configuration = Configuration.fromMap(flinkMap);
+ env.configure(configuration);
+
+ String database = "db1";
+ String tablePrefix = "";
+ String tableSuffix = "";
+ Map<String,String> mysqlConfig = new HashMap<>();
+ mysqlConfig.put("database-name","db1");
+ mysqlConfig.put("hostname","127.0.0.1");
+ mysqlConfig.put("port","3306");
+ mysqlConfig.put("username","root");
+ mysqlConfig.put("password","");
+ Configuration config = Configuration.fromMap(mysqlConfig);
+
+ Map<String,String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes","127.0.0.1:8030");
+ sinkConfig.put("username","root");
+ sinkConfig.put("password","");
+ sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String,String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+
+ String includingTables = "tbl1|tbl2|tbl3";
+ String excludingTables = "";
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig);
+ databaseSync.build();
+ env.execute(String.format("MySQL-Doris Database Sync: %s", database));
+
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
index 9d73f53..1f1fe61 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
@@ -17,9 +17,9 @@
package org.apache.doris.flink.utils;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
-import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org