You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/13 09:24:27 UTC

[GitHub] [inlong] e-mhui opened a new pull request, #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

e-mhui opened a new pull request, #6175:
URL: https://github.com/apache/inlong/pull/6175

   *Sort support all migrate for Oracle connector*
   
   - Fixes #6045
   
   ### Motivation
   
   The thing is sometime user want to export all data from database with diffrent schema to kafka with canal json
   This is something flinkcdc cannot do since we all need to specify the source shema when writing sql
   
   ### Modifications
   
   refer: #4010
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002933462


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/JsonDebeziumDeserializationSchema.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
+/**
+ * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
+ * received {@link SourceRecord} to JSON String.
+ */
+public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002938240


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java:
##########
@@ -54,6 +54,10 @@ default String getMetadataKey(MetaField metaField) {
             case OP_TS:
                 metadataKey = "op_ts";
                 break;
+            case DATA:
+            case DATA_BYTES:
+                metadataKey = "meta.data";

Review Comment:
   Maybe it is better to add support for 'DATA_CANAL'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996659903


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleValidator.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.inlong.sort.cdc.oracle.debezium.Validator;
+import org.apache.inlong.sort.cdc.oracle.util.OracleJdbcUrlUtils;
+
+/** Validates the version of the database connecting to. */
+public class OracleValidator implements Validator {

Review Comment:
   can such class be replaced by the dependency provided by the flink cdc ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002965623


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java:
##########
@@ -54,6 +54,10 @@ default String getMetadataKey(MetaField metaField) {
             case OP_TS:
                 metadataKey = "op_ts";
                 break;
+            case DATA:
+            case DATA_BYTES:
+                metadataKey = "meta.data";

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #6175:
URL: https://github.com/apache/inlong/pull/6175


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002872351


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##########
@@ -130,6 +130,12 @@ public final class Constants {
                             + "is used extract database name from the raw binary data, "
                             + "this is only used in the multiple sink writing scenario.");
 
+    public static final ConfigOption<Boolean> SOURCE_MULTIPLE_ENABLE =
+            ConfigOptions.key("source.multiple.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether migrate multiple databases");

Review Comment:
   Now there is only one source, It should be appropriate to Call 'Which enable migrate multiple databases'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996733347


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java:
##########
@@ -130,13 +129,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
                         .setResultTypeInfo(typeInfo)
                         .setUserDefinedConverterFactory(
                                 OracleDeserializationConverterFactory.instance())
+                        .setMigrateAll(migrateAll)
                         .build();
         OracleSource.Builder<RowData> builder =
                 OracleSource.<RowData>builder()
                         .hostname(hostname)
                         .port(port)
                         .database(database)
-                        .tableList(schemaName + "." + tableName)
+                        .tableList(tableName)

Review Comment:
   Manager does not need to change, the previous rule of `table-name` was `<tableName>`. Now it needs to be changed to `<schemaName>.<tableName>`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996733347


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java:
##########
@@ -130,13 +129,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
                         .setResultTypeInfo(typeInfo)
                         .setUserDefinedConverterFactory(
                                 OracleDeserializationConverterFactory.instance())
+                        .setMigrateAll(migrateAll)
                         .build();
         OracleSource.Builder<RowData> builder =
                 OracleSource.<RowData>builder()
                         .hostname(hostname)
                         .port(port)
                         .database(database)
-                        .tableList(schemaName + "." + tableName)
+                        .tableList(tableName)

Review Comment:
   Manager does not need to change, the previous rule of 'table name' was'<tableName>'. Now it needs to be changed to'<schemaName>.<tableName>`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002383107


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java:
##########
@@ -95,6 +95,12 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
                             "Optional startup mode for Oracle CDC consumer, valid enumerations are "
                                     + "\"initial\", \"latest-offset\"");
 
+    public static final ConfigOption<Boolean> MIGRATE_ALL =
+            ConfigOptions.key("migrate-all")

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r997826169


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AllMigrateOracleTest {
+
+    private OracleExtractNode buildAllMigrateExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new MetaFieldInfo("data", MetaField.DATA));
+        Map<String, String> option = new HashMap<>();

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r997766202


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AllMigrateOracleTest {
+
+    private OracleExtractNode buildAllMigrateExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new MetaFieldInfo("data", MetaField.DATA));
+        Map<String, String> option = new HashMap<>();

Review Comment:
   Does It need support MetaField.DATA_BYTES?@e-mhui@EMsnap
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996784132


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleValidator.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.inlong.sort.cdc.oracle.debezium.Validator;
+import org.apache.inlong.sort.cdc.oracle.util.OracleJdbcUrlUtils;
+
+/** Validates the version of the database connecting to. */
+public class OracleValidator implements Validator {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996656309


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java:
##########
@@ -130,13 +129,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
                         .setResultTypeInfo(typeInfo)
                         .setUserDefinedConverterFactory(
                                 OracleDeserializationConverterFactory.instance())
+                        .setMigrateAll(migrateAll)
                         .build();
         OracleSource.Builder<RowData> builder =
                 OracleSource.<RowData>builder()
                         .hostname(hostname)
                         .port(port)
                         .database(database)
-                        .tableList(schemaName + "." + tableName)
+                        .tableList(tableName)

Review Comment:
   manager should change the parameter rules noted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996866275


##########
licenses/inlong-sort-connectors/LICENSE:
##########
@@ -489,6 +489,30 @@
        inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
        inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
        inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/history/FlinkJsonTableChangeSerializer.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumChangeConsumer.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumChangeFetcher.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumOffset.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumOffsetSerializer.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/FlinkDatabaseHistory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/FlinkDatabaseSchemaHistory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/FlinkOffsetBackingStore.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/Handover.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/SchemaRecord.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/AppendMetadataCollector.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/DebeziumOptions.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/DeserializationRuntimeConverter.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/DeserializationRuntimeConverterFactory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/MetadataConverter.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/utils/DatabaseHistoryUtil.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/utils/TemporalConversions.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumDeserializationSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/JsonDebeziumDeserializationSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/StringDebeziumDeserializationSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleDeserializationConverterFactory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002857978


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##########
@@ -130,6 +130,12 @@ public final class Constants {
                             + "is used extract database name from the raw binary data, "
                             + "this is only used in the multiple sink writing scenario.");
 
+    public static final ConfigOption<Boolean> SOURCE_MULTIPLE_ENABLE =
+            ConfigOptions.key("source.multiple.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether migrate multiple databases");

Review Comment:
   Maybe it is better to call 'Whether enable multiple source'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996733347


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java:
##########
@@ -130,13 +129,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
                         .setResultTypeInfo(typeInfo)
                         .setUserDefinedConverterFactory(
                                 OracleDeserializationConverterFactory.instance())
+                        .setMigrateAll(migrateAll)
                         .build();
         OracleSource.Builder<RowData> builder =
                 OracleSource.<RowData>builder()
                         .hostname(hostname)
                         .port(port)
                         .database(database)
-                        .tableList(schemaName + "." + tableName)
+                        .tableList(tableName)

Review Comment:
   Manager does not need to change, the previous rule of `table name` was `<tableName>`. Now it needs to be changed to `<schemaName>.<tableName>`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996845423


##########
licenses/inlong-sort-connectors/LICENSE:
##########
@@ -489,6 +489,30 @@
        inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
        inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
        inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/history/FlinkJsonTableChangeSerializer.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumChangeConsumer.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumChangeFetcher.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumOffset.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumOffsetSerializer.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/FlinkDatabaseHistory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/FlinkDatabaseSchemaHistory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/FlinkOffsetBackingStore.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/Handover.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/SchemaRecord.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/AppendMetadataCollector.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/DebeziumOptions.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/DeserializationRuntimeConverter.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/DeserializationRuntimeConverterFactory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/MetadataConverter.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/utils/DatabaseHistoryUtil.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/utils/TemporalConversions.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumDeserializationSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/JsonDebeziumDeserializationSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/StringDebeziumDeserializationSchema.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleDeserializationConverterFactory.java
+       inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java

Review Comment:
   oracle copied from elasticsearch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996853672


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumChangeConsumer.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.debezium.internal;
+
+import io.debezium.embedded.EmbeddedEngineChangeEvent;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Consume debezium change events. */
+@Internal
+public class DebeziumChangeConsumer

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002411693


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java:
##########
@@ -95,6 +95,12 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
                             "Optional startup mode for Oracle CDC consumer, valid enumerations are "
                                     + "\"initial\", \"latest-offset\"");
 
+    public static final ConfigOption<Boolean> MIGRATE_ALL =

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r1002920708


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/JsonDebeziumDeserializationSchema.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
+/**
+ * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
+ * received {@link SourceRecord} to JSON String.
+ */
+public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {

Review Comment:
   are JsonDebeziumDeserializationSchema and StringDebeziumDeserializationSchema ever used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6175: [INLONG-6045][Sort] Sort support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r996658557


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/internal/DebeziumChangeConsumer.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.debezium.internal;
+
+import io.debezium.embedded.EmbeddedEngineChangeEvent;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Consume debezium change events. */
+@Internal
+public class DebeziumChangeConsumer

Review Comment:
   the classes copied from flink cdc should be noted in the license file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r997766202


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AllMigrateOracleTest {
+
+    private OracleExtractNode buildAllMigrateExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new MetaFieldInfo("data", MetaField.DATA));
+        Map<String, String> option = new HashMap<>();

Review Comment:
   Does It need support MetaField.DATA_BYTES?@e-mhui @EMsnap 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6175: [INLONG-6045][Sort] Support all migrate for Oracle connector

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6175:
URL: https://github.com/apache/inlong/pull/6175#discussion_r998281899


##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java:
##########
@@ -95,6 +95,12 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
                             "Optional startup mode for Oracle CDC consumer, valid enumerations are "
                                     + "\"initial\", \"latest-offset\"");
 
+    public static final ConfigOption<Boolean> MIGRATE_ALL =

Review Comment:
   Maybe it is can also extracted to class Constants of the mode 'base'?



##########
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java:
##########
@@ -95,6 +95,12 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
                             "Optional startup mode for Oracle CDC consumer, valid enumerations are "
                                     + "\"initial\", \"latest-offset\"");
 
+    public static final ConfigOption<Boolean> MIGRATE_ALL =
+            ConfigOptions.key("migrate-all")

Review Comment:
   Maybe it is better to call 'source.multiple.enable'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org