You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/07/19 03:21:17 UTC

[doris-flink-connector] branch master updated: [Feature ]add oracle database sync (#156)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ce5328  [Feature ]add oracle database sync (#156)
2ce5328 is described below

commit 2ce53280014f8e31a9089135ac0da72b84b674b8
Author: wudi <67...@qq.com>
AuthorDate: Wed Jul 19 11:21:12 2023 +0800

    [Feature ]add oracle database sync (#156)
    
    1. Add Oracle whole database synchronization, the specific process is the same as #141.
    The example is as follows:
    ```shell
    bin/flink run \
         -Dexecution.checkpointing.interval=10s\
         -Dparallelism.default=1\
         -c org.apache.doris.flink.tools.cdc.CdcTools\
         ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
         oracle-sync-database \
         --database db1\
         --oracle-conf hostname=127.0.0.1 \
         --oracle-conf port=1521 \
         --oracle-conf username=admin \
         --oracle-conf password="admin123" \
         --oracle-conf database-name=XE \
         --oracle-conf schema-name=ADMIN \
         --including-tables "tbl1|tbl2" \
         --sink-conf fenodes=127.0.0.1:8030 \
         --sink-conf username=root \
         --sink-conf password=\
         --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
         --sink-conf sink.label-prefix=label \
         --sink-conf sink.enable-2pc=false \
         --table-conf replication_num=1
    ```
    Among them, oracle-conf is the configuration of flinkcdc oracle, which can be referred to [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html), other configurations are the same as MySQL #141
    
    2. Add the parameter `--create-table-only`, when specified, only create the table without submitting the task.
---
 flink-doris-connector/pom.xml                      |  12 +-
 .../doris/flink/catalog/doris/DorisSystem.java     |  10 +-
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  34 +++-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  14 +-
 .../apache/doris/flink/tools/cdc/SourceSchema.java |  13 +-
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   |  13 +-
 .../doris/flink/tools/cdc/mysql/MysqlSchema.java   |  32 ++++
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 191 +++++++++++++++++++++
 .../tools/cdc/oracle/OracleDateConverter.java      | 134 +++++++++++++++
 .../doris/flink/tools/cdc/oracle/OracleSchema.java |  33 ++++
 .../doris/flink/tools/cdc/oracle/OracleType.java   | 103 +++++++++++
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |   2 +-
 ...seCase.java => CdcOraclelSyncDatabaseCase.java} |  50 +++---
 13 files changed, 588 insertions(+), 53 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 69ddf9c..b637fef 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -67,9 +67,9 @@ under the License.
     </mailingLists>
 
     <properties>
-        <revision>1.4.0-SNAPSHOT</revision>
-        <flink.version>1.15.0</flink.version>
-        <flink.minor.version>1.15</flink.minor.version>
+        <revision>1.5.0-SNAPSHOT</revision>
+        <flink.version>1.16.0</flink.version>
+        <flink.minor.version>1.16</flink.minor.version>
         <libthrift.version>0.16.0</libthrift.version>
         <arrow.version>5.0.0</arrow.version>
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
@@ -248,6 +248,12 @@ under the License.
             <version>2.3.0</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-sql-connector-oracle-cdc</artifactId>
+            <version>2.3.0</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime-web</artifactId>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 0a0f5e9..3e8612e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -213,10 +213,18 @@ public class DorisSystem {
                 .append(" ")
                 .append(field.getTypeString())
                 .append(" COMMENT '")
-                .append(field.getComment() == null ? "" : field.getComment())
+                .append(quoteComment(field.getComment()))
                 .append("',");
     }
 
+    private String quoteComment(String comment){
+        if(comment == null){
+            return "";
+        } else {
+            return comment.replaceAll("'","\\\\'");
+        }
+    }
+
     private List<String> identifier(List<String> name) {
         List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
         return result;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 809c4ea..85790d9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -18,6 +18,7 @@ package org.apache.doris.flink.tools.cdc;
 
 
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,6 +34,7 @@ import java.util.Map;
  */
 public class CdcTools {
     private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
+    private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
     private static final List<String> EMPTY_KEYS = Arrays.asList("password");
 
     public static void main(String[] args) throws Exception {
@@ -43,6 +45,9 @@ public class CdcTools {
             case MYSQL_SYNC_DATABASE:
                 createMySQLSyncDatabase(opArgs);
                 break;
+            case ORACLE_SYNC_DATABASE:
+                createOracleSyncDatabase(opArgs);
+                break;
             default:
                 System.out.println("Unknown operation " + operation);
                 System.exit(1);
@@ -51,27 +56,38 @@ public class CdcTools {
 
     private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
         MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
+        Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        syncDatabase(params, databaseSync, mysqlConfig, "MySQL");
+    }
+
+    private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
+        Configuration oracleConfig = Configuration.fromMap(oracleMap);
+        DatabaseSync databaseSync = new OracleDatabaseSync();
+        syncDatabase(params, databaseSync, oracleConfig, "Oracle");
+    }
+
+    private static void syncDatabase(MultipleParameterTool params, DatabaseSync databaseSync, Configuration config, String type) throws Exception {
         String jobName = params.get("job-name");
         String database = params.get("database");
         String tablePrefix = params.get("table-prefix");
         String tableSuffix = params.get("table-suffix");
         String includingTables = params.get("including-tables");
         String excludingTables = params.get("excluding-tables");
+        boolean createTableOnly = params.has("create-table-only");
 
-        Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
         Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
         Map<String, String> tableMap = getConfigMap(params, "table-conf");
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap, createTableOnly);
         databaseSync.build();
-
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
-            jobName = String.format("MySQL-Doris Sync Database: %s", mysqlMap.get("database-name"));
+            jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
         }
         env.execute(jobName);
     }
@@ -83,7 +99,7 @@ public class CdcTools {
 
         Map<String, String> map = new HashMap<>();
         for (String param : params.getMultiParameter(key)) {
-            String[] kv = param.split("=");
+            String[] kv = param.split("=", 2);
             if (kv.length == 2) {
                 map.put(kv[0], kv[1]);
                 continue;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 1699d52..799eff1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -16,7 +16,6 @@
 // under the License.
 package org.apache.doris.flink.tools.cdc;
 
-import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisConnectionOptions;
@@ -50,6 +49,7 @@ import java.util.regex.Pattern;
 public abstract class DatabaseSync {
     private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
     private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+    private static final String TABLE_NAME_OPTIONS = "table-name";
     protected Configuration config;
     protected String database;
     protected TableNameConverter converter;
@@ -58,6 +58,7 @@ public abstract class DatabaseSync {
     protected Map<String, String> tableConfig;
     protected Configuration sinkConfig;
     public StreamExecutionEnvironment env;
+    private boolean createTableOnly = false;
 
     public abstract Connection getConnection() throws SQLException;
 
@@ -65,10 +66,9 @@ public abstract class DatabaseSync {
 
     public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env);
 
-
     public void create(StreamExecutionEnvironment env, String database, Configuration config,
                        String tablePrefix, String tableSuffix, String includingTables,
-                       String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig) {
+                       String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig, boolean createTableOnly) {
         this.env = env;
         this.config = config;
         this.database = database;
@@ -81,6 +81,7 @@ public abstract class DatabaseSync {
         if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
             this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
         }
+        this.createTableOnly = createTableOnly;
     }
 
     public void build() throws Exception {
@@ -107,8 +108,13 @@ public abstract class DatabaseSync {
             }
             dorisTables.add(dorisTable);
         }
+        if(createTableOnly){
+            System.out.println("Create table finished.");
+            System.exit(0);
+        }
+
         Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized.");
-        config.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", syncTables) + ")");
+        config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")");
 
         DataStreamSource<String> streamSource = buildCdcSource(env);
         SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 03aa90e..6fdbeac 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink.tools.cdc;
 import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
 
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -28,7 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SourceSchema {
+public abstract class SourceSchema {
     private final String databaseName;
     private final String tableName;
     private final String tableComment;
@@ -36,14 +35,14 @@ public class SourceSchema {
     public final List<String> primaryKeys;
 
     public SourceSchema(
-            DatabaseMetaData metaData, String databaseName, String tableName, String tableComment)
+            DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment)
             throws Exception {
         this.databaseName = databaseName;
         this.tableName = tableName;
         this.tableComment = tableComment;
 
         fields = new LinkedHashMap<>();
-        try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) {
+        try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 String comment = rs.getString("REMARKS");
@@ -57,13 +56,13 @@ public class SourceSchema {
                 if (rs.wasNull()) {
                     scale = null;
                 }
-                String dorisTypeStr = MysqlType.toDorisType(fieldType, precision, scale);
+                String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
                 fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
             }
         }
 
         primaryKeys = new ArrayList<>();
-        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) {
+        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 primaryKeys.add(fieldName);
@@ -71,6 +70,8 @@ public class SourceSchema {
         }
     }
 
+    public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
+
     public TableSchema convertTableSchema(Map<String, String> tableProps) {
         TableSchema tableSchema = new TableSchema();
         tableSchema.setModel(DataModel.UNIQUE);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 629e6e1..4d6d250 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -21,7 +21,6 @@ import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
@@ -49,8 +48,8 @@ import java.util.Properties;
 
 public class MysqlDatabaseSync extends DatabaseSync {
     private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class);
-
     private static String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true";
+    private static String PROPERTIES_PREFIX = "jdbc.properties.";
 
     public MysqlDatabaseSync() {
     }
@@ -80,7 +79,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
                         continue;
                     }
                     SourceSchema sourceSchema =
-                            new SourceSchema(metaData, databaseName, tableName, tableComment);
+                            new MysqlSchema(metaData, databaseName, tableName, tableComment);
                     if (sourceSchema.primaryKeys.size() > 0) {
                         //Only sync tables with primary keys
                         schemaList.add(sourceSchema);
@@ -170,8 +169,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
         for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
             String key = entry.getKey();
             String value = entry.getValue();
-            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
-                jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+            if (key.startsWith(PROPERTIES_PREFIX)) {
+                jdbcProperties.put(key.substring(PROPERTIES_PREFIX.length()), value);
             } else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
                 debeziumProperties.put(
                         key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
@@ -196,8 +195,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
         for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
             String key = entry.getKey();
             String value = entry.getValue();
-            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
-                jdbcProps.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+            if (key.startsWith(PROPERTIES_PREFIX)) {
+                jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value);
             }
         }
         return jdbcProps;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
new file mode 100644
index 0000000..ff47fd4
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class MysqlSchema extends SourceSchema {
+
+    public MysqlSchema(DatabaseMetaData metaData, String databaseName, String tableName, String tableComment) throws Exception {
+        super(metaData, databaseName, null, tableName, tableComment);
+    }
+
+    public String convertToDorisType(String fieldType, Integer precision, Integer scale){
+        return MysqlType.toDorisType(fieldType, precision, scale);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
new file mode 100644
index 0000000..f90f858
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import com.ververica.cdc.connectors.oracle.OracleSource;
+import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
+import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+
+public class OracleDatabaseSync extends DatabaseSync {
+    private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class);
+
+    private static String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
+
+    public OracleDatabaseSync() {
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        String jdbcUrl;
+        if(!StringUtils.isNullOrWhitespaceOnly(config.get(OracleSourceOptions.URL))){
+            jdbcUrl = config.get(OracleSourceOptions.URL);
+        }else{
+            jdbcUrl = String.format(JDBC_URL, config.get(OracleSourceOptions.HOSTNAME), config.get(OracleSourceOptions.PORT),config.get(OracleSourceOptions.DATABASE_NAME));
+        }
+        Properties pro = new Properties();
+        pro.setProperty("user", config.get(OracleSourceOptions.USERNAME));
+        pro.setProperty("password", config.get(OracleSourceOptions.PASSWORD));
+        pro.put("remarksReporting", "true");
+        return DriverManager.getConnection(jdbcUrl, pro);
+    }
+
+    @Override
+    public List<SourceSchema> getSchemaList() throws Exception {
+        String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+        List<SourceSchema> schemaList = new ArrayList<>();
+        LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+        try (Connection conn = getConnection()) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet tables =
+                         metaData.getTables(databaseName, schemaName, "%", new String[]{"TABLE"})) {
+                while (tables.next()) {
+                    String tableName = tables.getString("TABLE_NAME");
+                    String tableComment = tables.getString("REMARKS");
+                    if (!isSyncNeeded(tableName)) {
+                        continue;
+                    }
+                    SourceSchema sourceSchema =
+                            new OracleSchema(metaData, databaseName, schemaName, tableName, tableComment);
+                    if (sourceSchema.primaryKeys.size() > 0) {
+                        //Only sync tables with primary keys
+                        schemaList.add(sourceSchema);
+                    } else {
+                        LOG.warn("table {} has no primary key, skip", tableName);
+                        System.out.println("table " + tableName + " has no primary key, skip.");
+                    }
+                }
+            }
+        }
+        return schemaList;
+    }
+
+    @Override
+    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+        String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+        Preconditions.checkNotNull(databaseName, "database-name in oracle is required");
+        Preconditions.checkNotNull(schemaName, "schema-name in oracle is required");
+        String tableName = config.get(OracleSourceOptions.TABLE_NAME);
+        String url = config.get(OracleSourceOptions.URL);
+        String hostname = config.get(OracleSourceOptions.HOSTNAME);
+        Integer port = config.get(OracleSourceOptions.PORT);
+        String username = config.get(OracleSourceOptions.USERNAME);
+        String password = config.get(OracleSourceOptions.PASSWORD);
+
+        StartupOptions startupOptions = StartupOptions.initial();
+        String startupMode = config.get(OracleSourceOptions.SCAN_STARTUP_MODE);
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.initial();
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.latest();
+        }
+
+        //debezium properties set
+        Properties debeziumProperties = new Properties();
+        debeziumProperties.put("decimal.handling.mode", "string");
+        //date to string
+        debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);
+
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+
+        if(config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)){
+            JdbcIncrementalSource<String> incrSource = OracleSourceBuilder.OracleIncrementalSource.<String>builder()
+                    .hostname(hostname)
+                    .url(url)
+                    .port(port)
+                    .databaseList(databaseName)
+                    .schemaList(schemaName)
+                    .tableList(schemaName + "." + tableName)
+                    .username(username)
+                    .password(password)
+                    .startupOptions(startupOptions)
+                    .deserializer(schema)
+                    .debeziumProperties(debeziumProperties)
+                    .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+                    .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+                    .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+                    .connectTimeout(config.get(CONNECT_TIMEOUT))
+                    .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+                    .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+                    .distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+                    .distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+                    .build();
+            return env.fromSource(incrSource,  WatermarkStrategy.noWatermarks(), "Oracle IncrSource");
+        }else{
+            DebeziumSourceFunction<String> oracleSource = OracleSource.<String>builder().url(url)
+                    .hostname(hostname)
+                    .port(port)
+                    .username(username)
+                    .password(password)
+                    .database(databaseName)
+                    .schemaList(schemaName)
+                    .tableList(schemaName + "." + tableName)
+                    .debeziumProperties(debeziumProperties)
+                    .startupOptions(startupOptions)
+                    .deserializer(schema)
+                    .build();
+            return env.addSource(oracleSource, "Oracle Source");
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
new file mode 100644
index 0000000..d7bd8fd
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import oracle.sql.TIMESTAMP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class OracleDateConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
+    private static final Logger log = LoggerFactory.getLogger(OracleDateConverter.class);
+    private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
+    private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
+    private static final Pattern TIMESTAMP_OR_DATE_REGEX = Pattern.compile("^TIMESTAMP[(]\\d[)]$|^DATE$", Pattern.CASE_INSENSITIVE);
+    private ZoneId timestampZoneId = ZoneId.systemDefault();
+    public static Properties DEFAULT_PROPS = new Properties();
+    private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+    private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+    private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+
+    static {
+        DEFAULT_PROPS.setProperty("converters", "oracleDate");
+        DEFAULT_PROPS.setProperty("oracleDate.type", "org.apache.doris.flink.tools.cdc.oracle.OracleDateConverter");
+    }
+
+    private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
+            .parseCaseInsensitive()
+            .appendPattern("yyyy-MM-dd HH:mm:ss")
+            .optionalStart()
+            .appendPattern(".")
+            .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
+            .optionalEnd()
+            .toFormatter();
+
+    @Override
+    public void configure(Properties props) {
+    }
+
+    @Override
+    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
+        String typeName = column.typeName();
+        if (TIMESTAMP_OR_DATE_REGEX.matcher(typeName).matches()) {
+            registration.register(SchemaBuilder.string().optional(), value -> {
+                if (value == null) {
+                    if (column.isOptional()) {
+                        return null;
+                    } else if (column.hasDefaultValue()) {
+                        return column.defaultValue();
+                    } else {
+                        return null;
+                    }
+                }
+
+                if (value instanceof String) {
+                    return  convertStringTimestamp((String) value);
+                }
+                if(value instanceof Timestamp){
+                    return dateTimeV2Formatter.format(((Timestamp) value).toLocalDateTime());
+                }
+
+                //oracle timestamp
+                try{
+                    if (value instanceof TIMESTAMP){
+                        return dateTimeV2Formatter.format(((TIMESTAMP) value).timestampValue().toLocalDateTime());
+                    }
+                }catch (SQLException ex){
+                    log.error("convert timestamp failed, values is {}", value);
+                }
+
+                return null;
+            });
+        }
+    }
+
+    private String convertStringTimestamp(String data) {
+        LocalDateTime dateTime;
+
+        final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data);
+        if (toTimestampMatcher.matches()) {
+            String dateText = toTimestampMatcher.group(1);
+            dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(completeMilliseconds(dateText.trim())));
+            return dateTimeV2Formatter.format(dateTime.atZone(timestampZoneId));
+        }
+
+        final Matcher toDateMatcher = TO_DATE.matcher(data);
+        if (toDateMatcher.matches()) {
+            String date = toDateMatcher.group(1);
+            dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(completeMilliseconds(date.trim())));
+            return dateTimeV2Formatter.format(dateTime.atZone(timestampZoneId));
+        }
+        return null;
+    }
+
+    private String completeMilliseconds(String stringValue) {
+        if(stringValue.length() == DATETIMEV2_PATTERN.length()){
+            return stringValue;
+        }
+        StringBuilder sb = new StringBuilder(stringValue);
+        if(stringValue.length() == DATETIME_PATTERN.length()){
+            sb.append(".");
+        }
+        while (sb.toString().length() < DATETIMEV2_PATTERN.length()){
+            sb.append(0);
+        }
+        return sb.toString();
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
new file mode 100644
index 0000000..464c9a3
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class OracleSchema extends SourceSchema {
+
+    public OracleSchema(DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment) throws Exception {
+        super(metaData, databaseName, schemaName,  tableName, tableComment);
+    }
+
+    @Override
+    public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
+        return OracleType.toDorisType(fieldType, precision, scale);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java
new file mode 100644
index 0000000..2024691
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.flink.util.Preconditions;
+
+public class OracleType {
+    private static final String VARCHAR2 = "VARCHAR2";
+    private static final String NVARCHAR2 = "NVARCHAR2";
+    private static final String NUMBER = "NUMBER";
+    private static final String FLOAT = "FLOAT";
+    private static final String LONG = "LONG";
+    private static final String DATE = "DATE";
+    private static final String BINARY_FLOAT = "BINARY_FLOAT";
+    private static final String BINARY_DOUBLE = "BINARY_DOUBLE";
+    private static final String TIMESTAMP = "TIMESTAMP";
+    private static final String INTERVAL = "INTERVAL";
+    private static final String RAW = "RAW";
+    private static final String LONG_RAW = "LONG RAW";
+    private static final String ROWID = "ROWID";
+    private static final String UROWID = "UROWID";
+    private static final String CHAR = "CHAR";
+    private static final String NCHAR = "NCHAR";
+    private static final String CLOB = "CLOB";
+    private static final String NCLOB = "NCLOB";
+    private static final String BLOB = "BLOB";
+    private static final String BFILE = "BFILE";
+
+    public static String toDorisType(String oracleType, Integer precision, Integer scale) {
+        oracleType = oracleType.toUpperCase();
+        if(oracleType.startsWith(INTERVAL)){
+            oracleType = oracleType.substring(0, 8);
+        } else if (oracleType.startsWith(TIMESTAMP)) {
+            return String.format("%s(%s)", DorisType.DATETIME_V2, 6);
+        }
+        switch (oracleType){
+            case NUMBER:
+                if (scale <= 0) {
+                    precision -= scale;
+                    if (precision < 3) {
+                        return DorisType.TINYINT;
+                    } else if (precision < 5) {
+                        return DorisType.SMALLINT;
+                    } else if (precision < 10) {
+                        return DorisType.INT;
+                    } else if (precision < 19) {
+                        return DorisType.BIGINT;
+                    } else if (precision < 39) {
+                        // LARGEINT supports up to 38 numbers.
+                        return DorisType.LARGEINT;
+                    } else {
+                        return DorisType.STRING;
+                    }
+                }
+                // scale > 0
+                if (precision < scale) {
+                    precision = scale;
+                }
+                return precision != null && precision <= 38
+                        ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
+                        : DorisType.STRING;
+            case FLOAT:
+                return DorisType.DOUBLE;
+            case DATE:
+                // can save date and time with second precision
+                return DorisType.DATETIME_V2;
+            case CHAR:
+            case VARCHAR2:
+            case NCHAR:
+            case NVARCHAR2:
+                Preconditions.checkNotNull(precision);
+                return precision * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+            case LONG:
+            case RAW:
+            case LONG_RAW:
+            case INTERVAL:
+            case BLOB:
+            case CLOB:
+            case NCLOB:
+                return DorisType.STRING;
+            case BFILE:
+            case BINARY_FLOAT:
+            case BINARY_DOUBLE:
+            default:
+                throw new UnsupportedOperationException("Unsupported Oracle Type: " + oracleType);
+        }
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index cbd3b38..c940477 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -65,7 +65,7 @@ public class CdcMysqlSyncDatabaseCase {
         String includingTables = "tbl1|tbl2|tbl3";
         String excludingTables = "";
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig);
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig, false);
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
similarity index 57%
copy from flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
copy to flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index cbd3b38..23610ab 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -16,7 +16,7 @@
 // under the License.
 package org.apache.doris.flink.tools.cdc;
 
-import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -24,32 +24,38 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
-public class CdcMysqlSyncDatabaseCase {
+public class CdcOraclelSyncDatabaseCase {
 
     public static void main(String[] args) throws Exception{
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-//        env.setParallelism(1);
+        env.setParallelism(1);
+        env.disableOperatorChaining();
+        env.enableCheckpointing(10000);
 
-        Map<String,String> flinkMap = new HashMap<>();
-        flinkMap.put("execution.checkpointing.interval","10s");
-        flinkMap.put("pipeline.operator-chaining","false");
-        flinkMap.put("parallelism.default","1");
-
-
-        Configuration configuration = Configuration.fromMap(flinkMap);
-        env.configure(configuration);
+//        Map<String,String> flinkMap = new HashMap<>();
+//        flinkMap.put("execution.checkpointing.interval","10s");
+//        flinkMap.put("pipeline.operator-chaining","false");
+//        flinkMap.put("parallelism.default","1");
+//
+//
+//        Configuration configuration = Configuration.fromMap(flinkMap);
+//        env.configure(configuration);
 
         String database = "db1";
         String tablePrefix = "";
         String tableSuffix = "";
-        Map<String,String> mysqlConfig = new HashMap<>();
-        mysqlConfig.put("database-name","db1");
-        mysqlConfig.put("hostname","127.0.0.1");
-        mysqlConfig.put("port","3306");
-        mysqlConfig.put("username","root");
-        mysqlConfig.put("password","");
-        Configuration config = Configuration.fromMap(mysqlConfig);
+        Map<String,String> sourceConfig = new HashMap<>();
+        sourceConfig.put("database-name","XE");
+        sourceConfig.put("schema-name","ADMIN");
+        sourceConfig.put("hostname","127.0.0.1");
+        sourceConfig.put("port","1521");
+        sourceConfig.put("username","admin");
+        sourceConfig.put("password","");
+//        sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+        sourceConfig.put("debezium.log.mining.strategy","online_catalog");
+        sourceConfig.put("debezium.log.mining.continuous.mine","true");
+        Configuration config = Configuration.fromMap(sourceConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
         sinkConfig.put("fenodes","127.0.0.1:8030");
@@ -62,12 +68,12 @@ public class CdcMysqlSyncDatabaseCase {
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "tbl1|tbl2|tbl3";
+        String includingTables = "test.*";
         String excludingTables = "";
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig);
+        DatabaseSync databaseSync = new OracleDatabaseSync();
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig, false);
         databaseSync.build();
-        env.execute(String.format("MySQL-Doris Database Sync: %s", database));
+        env.execute(String.format("Oracle-Doris Database Sync: %s", database));
 
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org