You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/07/19 03:21:17 UTC
[doris-flink-connector] branch master updated: [Feature ]add oracle database sync (#156)
This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2ce5328 [Feature ]add oracle database sync (#156)
2ce5328 is described below
commit 2ce53280014f8e31a9089135ac0da72b84b674b8
Author: wudi <67...@qq.com>
AuthorDate: Wed Jul 19 11:21:12 2023 +0800
[Feature ]add oracle database sync (#156)
1. Add Oracle whole database synchronization, the specific process is the same as #141.
The example is as follows:
```shell
bin/flink run \
-Dexecution.checkpointing.interval=10s\
-Dparallelism.default=1\
-c org.apache.doris.flink.tools.cdc.CdcTools\
./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
oracle-sync-database \
--database db1\
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="admin123" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--sink-conf sink.enable-2pc=false \
--table-conf replication_num=1
```
Among them, oracle-conf is the configuration of flinkcdc oracle, which can be referred to [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html), other configurations are the same as MySQL #141
2. Add the parameter `--create-table-only`, when specified, only create the table without submitting the task.
---
flink-doris-connector/pom.xml | 12 +-
.../doris/flink/catalog/doris/DorisSystem.java | 10 +-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 34 +++-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 14 +-
.../apache/doris/flink/tools/cdc/SourceSchema.java | 13 +-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 13 +-
.../doris/flink/tools/cdc/mysql/MysqlSchema.java | 32 ++++
.../flink/tools/cdc/oracle/OracleDatabaseSync.java | 191 +++++++++++++++++++++
.../tools/cdc/oracle/OracleDateConverter.java | 134 +++++++++++++++
.../doris/flink/tools/cdc/oracle/OracleSchema.java | 33 ++++
.../doris/flink/tools/cdc/oracle/OracleType.java | 103 +++++++++++
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 +-
...seCase.java => CdcOraclelSyncDatabaseCase.java} | 50 +++---
13 files changed, 588 insertions(+), 53 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 69ddf9c..b637fef 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -67,9 +67,9 @@ under the License.
</mailingLists>
<properties>
- <revision>1.4.0-SNAPSHOT</revision>
- <flink.version>1.15.0</flink.version>
- <flink.minor.version>1.15</flink.minor.version>
+ <revision>1.5.0-SNAPSHOT</revision>
+ <flink.version>1.16.0</flink.version>
+ <flink.minor.version>1.16</flink.minor.version>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
@@ -248,6 +248,12 @@ under the License.
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-sql-connector-oracle-cdc</artifactId>
+ <version>2.3.0</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 0a0f5e9..3e8612e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -213,10 +213,18 @@ public class DorisSystem {
.append(" ")
.append(field.getTypeString())
.append(" COMMENT '")
- .append(field.getComment() == null ? "" : field.getComment())
+ .append(quoteComment(field.getComment()))
.append("',");
}
+ private String quoteComment(String comment){
+ if(comment == null){
+ return "";
+ } else {
+ return comment.replaceAll("'","\\\\'");
+ }
+ }
+
private List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 809c4ea..85790d9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -18,6 +18,7 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,6 +34,7 @@ import java.util.Map;
*/
public class CdcTools {
private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
+ private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
private static final List<String> EMPTY_KEYS = Arrays.asList("password");
public static void main(String[] args) throws Exception {
@@ -43,6 +45,9 @@ public class CdcTools {
case MYSQL_SYNC_DATABASE:
createMySQLSyncDatabase(opArgs);
break;
+ case ORACLE_SYNC_DATABASE:
+ createOracleSyncDatabase(opArgs);
+ break;
default:
System.out.println("Unknown operation " + operation);
System.exit(1);
@@ -51,27 +56,38 @@ public class CdcTools {
private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
+ Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ syncDatabase(params, databaseSync, mysqlConfig, "MySQL");
+ }
+
+ private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
+ Configuration oracleConfig = Configuration.fromMap(oracleMap);
+ DatabaseSync databaseSync = new OracleDatabaseSync();
+ syncDatabase(params, databaseSync, oracleConfig, "Oracle");
+ }
+
+ private static void syncDatabase(MultipleParameterTool params, DatabaseSync databaseSync, Configuration config, String type) throws Exception {
String jobName = params.get("job-name");
String database = params.get("database");
String tablePrefix = params.get("table-prefix");
String tableSuffix = params.get("table-suffix");
String includingTables = params.get("including-tables");
String excludingTables = params.get("excluding-tables");
+ boolean createTableOnly = params.has("create-table-only");
- Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
Configuration sinkConfig = Configuration.fromMap(sinkMap);
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap, createTableOnly);
databaseSync.build();
-
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
- jobName = String.format("MySQL-Doris Sync Database: %s", mysqlMap.get("database-name"));
+ jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
}
env.execute(jobName);
}
@@ -83,7 +99,7 @@ public class CdcTools {
Map<String, String> map = new HashMap<>();
for (String param : params.getMultiParameter(key)) {
- String[] kv = param.split("=");
+ String[] kv = param.split("=", 2);
if (kv.length == 2) {
map.put(kv[0], kv[1]);
continue;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 1699d52..799eff1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -16,7 +16,6 @@
// under the License.
package org.apache.doris.flink.tools.cdc;
-import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
@@ -50,6 +49,7 @@ import java.util.regex.Pattern;
public abstract class DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+ private static final String TABLE_NAME_OPTIONS = "table-name";
protected Configuration config;
protected String database;
protected TableNameConverter converter;
@@ -58,6 +58,7 @@ public abstract class DatabaseSync {
protected Map<String, String> tableConfig;
protected Configuration sinkConfig;
public StreamExecutionEnvironment env;
+ private boolean createTableOnly = false;
public abstract Connection getConnection() throws SQLException;
@@ -65,10 +66,9 @@ public abstract class DatabaseSync {
public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env);
-
public void create(StreamExecutionEnvironment env, String database, Configuration config,
String tablePrefix, String tableSuffix, String includingTables,
- String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig) {
+ String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig, boolean createTableOnly) {
this.env = env;
this.config = config;
this.database = database;
@@ -81,6 +81,7 @@ public abstract class DatabaseSync {
if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
}
+ this.createTableOnly = createTableOnly;
}
public void build() throws Exception {
@@ -107,8 +108,13 @@ public abstract class DatabaseSync {
}
dorisTables.add(dorisTable);
}
+ if(createTableOnly){
+ System.out.println("Create table finished.");
+ System.exit(0);
+ }
+
Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized.");
- config.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", syncTables) + ")");
+ config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")");
DataStreamSource<String> streamSource = buildCdcSource(env);
SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 03aa90e..6fdbeac 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
@@ -28,7 +27,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-public class SourceSchema {
+public abstract class SourceSchema {
private final String databaseName;
private final String tableName;
private final String tableComment;
@@ -36,14 +35,14 @@ public class SourceSchema {
public final List<String> primaryKeys;
public SourceSchema(
- DatabaseMetaData metaData, String databaseName, String tableName, String tableComment)
+ DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment)
throws Exception {
this.databaseName = databaseName;
this.tableName = tableName;
this.tableComment = tableComment;
fields = new LinkedHashMap<>();
- try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) {
+ try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
@@ -57,13 +56,13 @@ public class SourceSchema {
if (rs.wasNull()) {
scale = null;
}
- String dorisTypeStr = MysqlType.toDorisType(fieldType, precision, scale);
+ String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}
primaryKeys = new ArrayList<>();
- try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) {
+ try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
@@ -71,6 +70,8 @@ public class SourceSchema {
}
}
+ public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
+
public TableSchema convertTableSchema(Map<String, String> tableProps) {
TableSchema tableSchema = new TableSchema();
tableSchema.setModel(DataModel.UNIQUE);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 629e6e1..4d6d250 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -21,7 +21,6 @@ import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
@@ -49,8 +48,8 @@ import java.util.Properties;
public class MysqlDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class);
-
private static String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true";
+ private static String PROPERTIES_PREFIX = "jdbc.properties.";
public MysqlDatabaseSync() {
}
@@ -80,7 +79,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
continue;
}
SourceSchema sourceSchema =
- new SourceSchema(metaData, databaseName, tableName, tableComment);
+ new MysqlSchema(metaData, databaseName, tableName, tableComment);
if (sourceSchema.primaryKeys.size() > 0) {
//Only sync tables with primary keys
schemaList.add(sourceSchema);
@@ -170,8 +169,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
- if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
- jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+ if (key.startsWith(PROPERTIES_PREFIX)) {
+ jdbcProperties.put(key.substring(PROPERTIES_PREFIX.length()), value);
} else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
debeziumProperties.put(
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
@@ -196,8 +195,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
- if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
- jdbcProps.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+ if (key.startsWith(PROPERTIES_PREFIX)) {
+ jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}
return jdbcProps;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
new file mode 100644
index 0000000..ff47fd4
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class MysqlSchema extends SourceSchema {
+
+ public MysqlSchema(DatabaseMetaData metaData, String databaseName, String tableName, String tableComment) throws Exception {
+ super(metaData, databaseName, null, tableName, tableComment);
+ }
+
+ public String convertToDorisType(String fieldType, Integer precision, Integer scale){
+ return MysqlType.toDorisType(fieldType, precision, scale);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
new file mode 100644
index 0000000..f90f858
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import com.ververica.cdc.connectors.oracle.OracleSource;
+import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
+import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+
+public class OracleDatabaseSync extends DatabaseSync {
+ private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class);
+
+ private static String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
+
+ public OracleDatabaseSync() {
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ String jdbcUrl;
+ if(!StringUtils.isNullOrWhitespaceOnly(config.get(OracleSourceOptions.URL))){
+ jdbcUrl = config.get(OracleSourceOptions.URL);
+ }else{
+ jdbcUrl = String.format(JDBC_URL, config.get(OracleSourceOptions.HOSTNAME), config.get(OracleSourceOptions.PORT),config.get(OracleSourceOptions.DATABASE_NAME));
+ }
+ Properties pro = new Properties();
+ pro.setProperty("user", config.get(OracleSourceOptions.USERNAME));
+ pro.setProperty("password", config.get(OracleSourceOptions.PASSWORD));
+ pro.put("remarksReporting", "true");
+ return DriverManager.getConnection(jdbcUrl, pro);
+ }
+
+ @Override
+ public List<SourceSchema> getSchemaList() throws Exception {
+ String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+ List<SourceSchema> schemaList = new ArrayList<>();
+ LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+ try (Connection conn = getConnection()) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet tables =
+ metaData.getTables(databaseName, schemaName, "%", new String[]{"TABLE"})) {
+ while (tables.next()) {
+ String tableName = tables.getString("TABLE_NAME");
+ String tableComment = tables.getString("REMARKS");
+ if (!isSyncNeeded(tableName)) {
+ continue;
+ }
+ SourceSchema sourceSchema =
+ new OracleSchema(metaData, databaseName, schemaName, tableName, tableComment);
+ if (sourceSchema.primaryKeys.size() > 0) {
+ //Only sync tables with primary keys
+ schemaList.add(sourceSchema);
+ } else {
+ LOG.warn("table {} has no primary key, skip", tableName);
+ System.out.println("table " + tableName + " has no primary key, skip.");
+ }
+ }
+ }
+ }
+ return schemaList;
+ }
+
+ @Override
+ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+ String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+ Preconditions.checkNotNull(databaseName, "database-name in oracle is required");
+ Preconditions.checkNotNull(schemaName, "schema-name in oracle is required");
+ String tableName = config.get(OracleSourceOptions.TABLE_NAME);
+ String url = config.get(OracleSourceOptions.URL);
+ String hostname = config.get(OracleSourceOptions.HOSTNAME);
+ Integer port = config.get(OracleSourceOptions.PORT);
+ String username = config.get(OracleSourceOptions.USERNAME);
+ String password = config.get(OracleSourceOptions.PASSWORD);
+
+ StartupOptions startupOptions = StartupOptions.initial();
+ String startupMode = config.get(OracleSourceOptions.SCAN_STARTUP_MODE);
+ if ("initial".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.initial();
+ } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.latest();
+ }
+
+ //debezium properties set
+ Properties debeziumProperties = new Properties();
+ debeziumProperties.put("decimal.handling.mode", "string");
+ //date to string
+ debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+ debeziumProperties.put(
+ key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+ }
+ }
+
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ JsonDebeziumDeserializationSchema schema =
+ new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+
+ if(config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)){
+ JdbcIncrementalSource<String> incrSource = OracleSourceBuilder.OracleIncrementalSource.<String>builder()
+ .hostname(hostname)
+ .url(url)
+ .port(port)
+ .databaseList(databaseName)
+ .schemaList(schemaName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .startupOptions(startupOptions)
+ .deserializer(schema)
+ .debeziumProperties(debeziumProperties)
+ .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+ .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+ .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+ .connectTimeout(config.get(CONNECT_TIMEOUT))
+ .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+ .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+ .distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+ .distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+ .build();
+ return env.fromSource(incrSource, WatermarkStrategy.noWatermarks(), "Oracle IncrSource");
+ }else{
+ DebeziumSourceFunction<String> oracleSource = OracleSource.<String>builder().url(url)
+ .hostname(hostname)
+ .port(port)
+ .username(username)
+ .password(password)
+ .database(databaseName)
+ .schemaList(schemaName)
+ .tableList(schemaName + "." + tableName)
+ .debeziumProperties(debeziumProperties)
+ .startupOptions(startupOptions)
+ .deserializer(schema)
+ .build();
+ return env.addSource(oracleSource, "Oracle Source");
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
new file mode 100644
index 0000000..d7bd8fd
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import oracle.sql.TIMESTAMP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class OracleDateConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
+ private static final Logger log = LoggerFactory.getLogger(OracleDateConverter.class);
+ private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TIMESTAMP_OR_DATE_REGEX = Pattern.compile("^TIMESTAMP[(]\\d[)]$|^DATE$", Pattern.CASE_INSENSITIVE);
+ private ZoneId timestampZoneId = ZoneId.systemDefault();
+ public static Properties DEFAULT_PROPS = new Properties();
+ private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+ private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+
+ static {
+ DEFAULT_PROPS.setProperty("converters", "oracleDate");
+ DEFAULT_PROPS.setProperty("oracleDate.type", "org.apache.doris.flink.tools.cdc.oracle.OracleDateConverter");
+ }
+
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
+ .optionalStart()
+ .appendPattern(".")
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
+ .optionalEnd()
+ .toFormatter();
+
+ @Override
+ public void configure(Properties props) {
+ }
+
+ @Override
+ public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
+ String typeName = column.typeName();
+ if (TIMESTAMP_OR_DATE_REGEX.matcher(typeName).matches()) {
+ registration.register(SchemaBuilder.string().optional(), value -> {
+ if (value == null) {
+ if (column.isOptional()) {
+ return null;
+ } else if (column.hasDefaultValue()) {
+ return column.defaultValue();
+ } else {
+ return null;
+ }
+ }
+
+ if (value instanceof String) {
+ return convertStringTimestamp((String) value);
+ }
+ if(value instanceof Timestamp){
+ return dateTimeV2Formatter.format(((Timestamp) value).toLocalDateTime());
+ }
+
+ //oracle timestamp
+ try{
+ if (value instanceof TIMESTAMP){
+ return dateTimeV2Formatter.format(((TIMESTAMP) value).timestampValue().toLocalDateTime());
+ }
+ }catch (SQLException ex){
+ log.error("convert timestamp failed, values is {}", value);
+ }
+
+ return null;
+ });
+ }
+ }
+
+ private String convertStringTimestamp(String data) {
+ LocalDateTime dateTime;
+
+ final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data);
+ if (toTimestampMatcher.matches()) {
+ String dateText = toTimestampMatcher.group(1);
+ dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(completeMilliseconds(dateText.trim())));
+ return dateTimeV2Formatter.format(dateTime.atZone(timestampZoneId));
+ }
+
+ final Matcher toDateMatcher = TO_DATE.matcher(data);
+ if (toDateMatcher.matches()) {
+ String date = toDateMatcher.group(1);
+ dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(completeMilliseconds(date.trim())));
+ return dateTimeV2Formatter.format(dateTime.atZone(timestampZoneId));
+ }
+ return null;
+ }
+
+ private String completeMilliseconds(String stringValue) {
+ if(stringValue.length() == DATETIMEV2_PATTERN.length()){
+ return stringValue;
+ }
+ StringBuilder sb = new StringBuilder(stringValue);
+ if(stringValue.length() == DATETIME_PATTERN.length()){
+ sb.append(".");
+ }
+ while (sb.toString().length() < DATETIMEV2_PATTERN.length()){
+ sb.append(0);
+ }
+ return sb.toString();
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
new file mode 100644
index 0000000..464c9a3
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class OracleSchema extends SourceSchema {
+
+ public OracleSchema(DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment) throws Exception {
+ super(metaData, databaseName, schemaName, tableName, tableComment);
+ }
+
+ @Override
+ public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
+ return OracleType.toDorisType(fieldType, precision, scale);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java
new file mode 100644
index 0000000..2024691
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.oracle;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.flink.util.Preconditions;
+
+public class OracleType {
+ private static final String VARCHAR2 = "VARCHAR2";
+ private static final String NVARCHAR2 = "NVARCHAR2";
+ private static final String NUMBER = "NUMBER";
+ private static final String FLOAT = "FLOAT";
+ private static final String LONG = "LONG";
+ private static final String DATE = "DATE";
+ private static final String BINARY_FLOAT = "BINARY_FLOAT";
+ private static final String BINARY_DOUBLE = "BINARY_DOUBLE";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String INTERVAL = "INTERVAL";
+ private static final String RAW = "RAW";
+ private static final String LONG_RAW = "LONG RAW";
+ private static final String ROWID = "ROWID";
+ private static final String UROWID = "UROWID";
+ private static final String CHAR = "CHAR";
+ private static final String NCHAR = "NCHAR";
+ private static final String CLOB = "CLOB";
+ private static final String NCLOB = "NCLOB";
+ private static final String BLOB = "BLOB";
+ private static final String BFILE = "BFILE";
+
+ public static String toDorisType(String oracleType, Integer precision, Integer scale) {
+ oracleType = oracleType.toUpperCase();
+ if(oracleType.startsWith(INTERVAL)){
+ oracleType = oracleType.substring(0, 8);
+ } else if (oracleType.startsWith(TIMESTAMP)) {
+ return String.format("%s(%s)", DorisType.DATETIME_V2, 6);
+ }
+ switch (oracleType){
+ case NUMBER:
+ if (scale <= 0) {
+ precision -= scale;
+ if (precision < 3) {
+ return DorisType.TINYINT;
+ } else if (precision < 5) {
+ return DorisType.SMALLINT;
+ } else if (precision < 10) {
+ return DorisType.INT;
+ } else if (precision < 19) {
+ return DorisType.BIGINT;
+ } else if (precision < 39) {
+ // LARGEINT supports up to 38 numbers.
+ return DorisType.LARGEINT;
+ } else {
+ return DorisType.STRING;
+ }
+ }
+ // scale > 0
+ if (precision < scale) {
+ precision = scale;
+ }
+ return precision != null && precision <= 38
+ ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case FLOAT:
+ return DorisType.DOUBLE;
+ case DATE:
+ // can save date and time with second precision
+ return DorisType.DATETIME_V2;
+ case CHAR:
+ case VARCHAR2:
+ case NCHAR:
+ case NVARCHAR2:
+ Preconditions.checkNotNull(precision);
+ return precision * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+ case LONG:
+ case RAW:
+ case LONG_RAW:
+ case INTERVAL:
+ case BLOB:
+ case CLOB:
+ case NCLOB:
+ return DorisType.STRING;
+ case BFILE:
+ case BINARY_FLOAT:
+ case BINARY_DOUBLE:
+ default:
+ throw new UnsupportedOperationException("Unsupported Oracle Type: " + oracleType);
+ }
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index cbd3b38..c940477 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -65,7 +65,7 @@ public class CdcMysqlSyncDatabaseCase {
String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig);
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig, false);
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
similarity index 57%
copy from flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
copy to flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index cbd3b38..23610ab 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -16,7 +16,7 @@
// under the License.
package org.apache.doris.flink.tools.cdc;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,32 +24,38 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-public class CdcMysqlSyncDatabaseCase {
+public class CdcOraclelSyncDatabaseCase {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-// env.setParallelism(1);
+ env.setParallelism(1);
+ env.disableOperatorChaining();
+ env.enableCheckpointing(10000);
- Map<String,String> flinkMap = new HashMap<>();
- flinkMap.put("execution.checkpointing.interval","10s");
- flinkMap.put("pipeline.operator-chaining","false");
- flinkMap.put("parallelism.default","1");
-
-
- Configuration configuration = Configuration.fromMap(flinkMap);
- env.configure(configuration);
+// Map<String,String> flinkMap = new HashMap<>();
+// flinkMap.put("execution.checkpointing.interval","10s");
+// flinkMap.put("pipeline.operator-chaining","false");
+// flinkMap.put("parallelism.default","1");
+//
+//
+// Configuration configuration = Configuration.fromMap(flinkMap);
+// env.configure(configuration);
String database = "db1";
String tablePrefix = "";
String tableSuffix = "";
- Map<String,String> mysqlConfig = new HashMap<>();
- mysqlConfig.put("database-name","db1");
- mysqlConfig.put("hostname","127.0.0.1");
- mysqlConfig.put("port","3306");
- mysqlConfig.put("username","root");
- mysqlConfig.put("password","");
- Configuration config = Configuration.fromMap(mysqlConfig);
+ Map<String,String> sourceConfig = new HashMap<>();
+ sourceConfig.put("database-name","XE");
+ sourceConfig.put("schema-name","ADMIN");
+ sourceConfig.put("hostname","127.0.0.1");
+ sourceConfig.put("port","1521");
+ sourceConfig.put("username","admin");
+ sourceConfig.put("password","");
+// sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+ sourceConfig.put("debezium.log.mining.strategy","online_catalog");
+ sourceConfig.put("debezium.log.mining.continuous.mine","true");
+ Configuration config = Configuration.fromMap(sourceConfig);
Map<String,String> sinkConfig = new HashMap<>();
sinkConfig.put("fenodes","127.0.0.1:8030");
@@ -62,12 +68,12 @@ public class CdcMysqlSyncDatabaseCase {
Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "tbl1|tbl2|tbl3";
+ String includingTables = "test.*";
String excludingTables = "";
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig);
+ DatabaseSync databaseSync = new OracleDatabaseSync();
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig, false);
databaseSync.build();
- env.execute(String.format("MySQL-Doris Database Sync: %s", database));
+ env.execute(String.format("Oracle-Doris Database Sync: %s", database));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org