You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/26 01:35:47 UTC

[inlong] branch master updated: [INLONG-7909][Sort] Fix Oracle CDC cannot capture ddl changes (#7917)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b70a1591 [INLONG-7909][Sort] Fix Oracle CDC cannot capture ddl changes (#7917)
0b70a1591 is described below

commit 0b70a1591a09cf098c679af51080ddce27148b81
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Wed Apr 26 09:35:41 2023 +0800

    [INLONG-7909][Sort] Fix Oracle CDC cannot capture ddl changes (#7917)
---
 inlong-sort/sort-connectors/oracle-cdc/pom.xml     |   1 +
 .../oracle/logminer/LogMinerQueryBuilder.java      | 241 ++++++++++
 .../LogMinerStreamingChangeEventSource.java        | 525 +++++++++++++++++++++
 licenses/inlong-sort-connectors/LICENSE            |   6 +
 4 files changed, 773 insertions(+)

diff --git a/inlong-sort/sort-connectors/oracle-cdc/pom.xml b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
index f097196a4..5c3de2611 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
@@ -125,6 +125,7 @@
                                     <artifact>org.apache.inlong:sort-connector-*</artifact>
                                     <includes>
                                         <include>org/apache/inlong/**</include>
+                                        <include>io/debezium/connector/oracle/**</include>
                                         <include>META-INF/services/org.apache.flink.table.factories.Factory</include>
                                     </includes>
                                 </filter>
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java
new file mode 100644
index 000000000..6cff235fd
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer;
+
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.connector.oracle.OracleDatabaseSchema;
+import io.debezium.util.Strings;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * A builder that is responsible for producing the query to be executed against the LogMiner view.
+ *
+ * @author Chris Cranford
+ * Copy from 'debezium-connector-oracle 1.6.4-Final' to fix cannot capture ddl changes made by connector user.
+ */
+public class LogMinerQueryBuilder {
+
+    private static final String LOGMNR_CONTENTS_VIEW = "V$LOGMNR_CONTENTS";
+
+    /**
+     * Builds the LogMiner contents view query.
+     *
+     * The returned query will contain 2 bind parameters that the caller is responsible for binding before
+     * executing the query.  The first bind parameter is the lower-bounds of the SCN mining window that is
+     * not-inclusive while the second is the upper-bounds of the SCN mining window that is inclusive.
+     *
+     * The built query relies on the following columns from V$LOGMNR_CONTENTS:
+     * <pre>
+     *     SCN - the system change number at which the change was made
+     *     SQL_REDO - the reconstructed SQL statement that initiated the change
+     *     OPERATION - the database operation type name
+     *     OPERATION_CODE - the database operation numeric code
+     *     TIMESTAMP - the time when the change event occurred
+     *     XID - the transaction identifier the change participated in
+     *     CSF - the continuation flag, identifies rows that should be processed together as single row, 0=no, 1=yes
+     *     TABLE_NAME - the name of the table for which the change is for
+     *     SEG_OWNER - the name of the schema for which the change is for
+     *     USERNAME - the name of the database user that caused the change
+     *     ROW_ID - the unique identifier of the row that the change is for, may not always be set with valid value
+     *     ROLLBACK - the rollback flag, value of 0 or 1.  1 implies the row was rolled back
+     *     RS_ID - the rollback segment idenifier where the change record was record from
+     * </pre>
+     *
+     * @param connectorConfig connector configuration, should not be {@code null}
+     * @param schema database schema, should not be {@code null}
+     * @return the SQL string to be used to fetch changes from Oracle LogMiner
+     */
+    public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
+        final StringBuilder query = new StringBuilder(1024);
+        query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, ");
+        query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID ");
+        query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");
+
+        // These bind parameters will be bound when the query is executed by the caller.
+        query.append("WHERE SCN > ? AND SCN <= ? ");
+
+        // Restrict to configured PDB if one is supplied
+        final String pdbName = connectorConfig.getPdbName();
+        if (!Strings.isNullOrEmpty(pdbName)) {
+            query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' ");
+        }
+
+        query.append("AND (");
+
+        // Always include START, COMMIT, MISSING_SCN, and ROLLBACK operations
+        query.append("(OPERATION_CODE IN (6,7,34,36)");
+
+        if (!schema.storeOnlyCapturedTables()) {
+            // In this mode, the connector will always be fed DDL operations for all tables even if they
+            // are not part of the inclusion/exclusion lists.
+            query.append(" OR ").append(buildDdlPredicate()).append(" ");
+            // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
+            if (connectorConfig.isLobEnabled()) {
+                query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) ");
+            } else {
+                query.append(") OR (OPERATION_CODE IN (1,2,3) ");
+            }
+        } else {
+            // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
+            if (connectorConfig.isLobEnabled()) {
+                query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) ");
+            } else {
+                query.append(") OR ((OPERATION_CODE IN (1,2,3) ");
+            }
+            // In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists
+            query.append("OR ").append(buildDdlPredicate()).append(") ");
+        }
+
+        // Always ignore the flush table
+        query.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' ");
+
+        // There are some common schemas that we automatically ignore when building the runtime Filter
+        // predicates and we put that same list of schemas here and apply those in the generated SQL.
+        if (!OracleConnectorConfig.EXCLUDED_SCHEMAS.isEmpty()) {
+            query.append("AND SEG_OWNER NOT IN (");
+            for (Iterator<String> i = OracleConnectorConfig.EXCLUDED_SCHEMAS.iterator(); i.hasNext();) {
+                String excludedSchema = i.next();
+                query.append("'").append(excludedSchema.toUpperCase()).append("'");
+                if (i.hasNext()) {
+                    query.append(",");
+                }
+            }
+            query.append(") ");
+        }
+
+        String schemaPredicate = buildSchemaPredicate(connectorConfig);
+        if (!Strings.isNullOrEmpty(schemaPredicate)) {
+            query.append("AND ").append(schemaPredicate).append(" ");
+        }
+
+        String tablePredicate = buildTablePredicate(connectorConfig);
+        if (!Strings.isNullOrEmpty(tablePredicate)) {
+            query.append("AND ").append(tablePredicate).append(" ");
+        }
+
+        query.append("))");
+
+        return query.toString();
+    }
+
+    /**
+     * Builds a common SQL fragment used to obtain DDL operations via LogMiner.
+     *
+     * @return predicate that can be used to obtain DDL operations via LogMiner
+     */
+    private static String buildDdlPredicate() {
+        final StringBuilder predicate = new StringBuilder(256);
+        predicate.append("(OPERATION_CODE = 5 ");
+        predicate.append("AND USERNAME NOT IN ('SYS','SYSTEM') ");
+        predicate.append("AND INFO NOT LIKE 'INTERNAL DDL%' ");
+        predicate.append("AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))");
+        return predicate.toString();
+    }
+
+    /**
+     * Builds a SQL predicate of what schemas to include/exclude based on the connector configuration.
+     *
+     * @param connectorConfig connector configuration, should not be {@code null}
+     * @return SQL predicate to filter results based on schema include/exclude configurations
+     */
+    private static String buildSchemaPredicate(OracleConnectorConfig connectorConfig) {
+        StringBuilder predicate = new StringBuilder();
+        if (Strings.isNullOrEmpty(connectorConfig.schemaIncludeList())) {
+            if (!Strings.isNullOrEmpty(connectorConfig.schemaExcludeList())) {
+                List<Pattern> patterns = Strings.listOfRegex(connectorConfig.schemaExcludeList(), 0);
+                predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", true)).append(")");
+            }
+        } else {
+            List<Pattern> patterns = Strings.listOfRegex(connectorConfig.schemaIncludeList(), 0);
+            predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", false)).append(")");
+        }
+        return predicate.toString();
+    }
+
+    /**
+     * Builds a SQL predicate of what tables to include/exclude based on the connector configuration.
+     *
+     * @param connectorConfig connector configuration, should not be {@code null}
+     * @return SQL predicate to filter results based on table include/exclude configuration
+     */
+    private static String buildTablePredicate(OracleConnectorConfig connectorConfig) {
+        StringBuilder predicate = new StringBuilder();
+        if (Strings.isNullOrEmpty(connectorConfig.tableIncludeList())) {
+            if (!Strings.isNullOrEmpty(connectorConfig.tableExcludeList())) {
+                List<Pattern> patterns = Strings.listOfRegex(connectorConfig.tableExcludeList(), 0);
+                predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", true))
+                        .append(")");
+            }
+        } else {
+            List<Pattern> patterns = Strings.listOfRegex(connectorConfig.tableIncludeList(), 0);
+            predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", false))
+                    .append(")");
+        }
+        return predicate.toString();
+    }
+
+    /**
+     * Takes a list of reg-ex patterns and builds an Oracle-specific predicate using {@code REGEXP_LIKE}
+     * in order to take the connector configuration include/exclude lists and assemble them as SQL
+     * predicates.
+     *
+     * @param patterns list of each individual include/exclude reg-ex patterns from connector configuration
+     * @param columnName the column in which the reg-ex patterns are to be applied against
+     * @param inclusion should be {@code true} when passing inclusion patterns, {@code false} otherwise
+     * @return
+     */
+    private static String listOfPatternsToSql(List<Pattern> patterns, String columnName, boolean inclusion) {
+        StringBuilder predicate = new StringBuilder();
+        for (Iterator<Pattern> i = patterns.iterator(); i.hasNext();) {
+            Pattern pattern = i.next();
+            if (inclusion) {
+                predicate.append("NOT ");
+            }
+            // NOTE: The REGEXP_LIKE operator was added in Oracle 10g (10.1.0.0.0)
+            final String text = resolveRegExpLikePattern(pattern);
+            predicate.append("REGEXP_LIKE(").append(columnName).append(",'").append(text).append("','i')");
+            if (i.hasNext()) {
+                // Exclude lists imply combining them via AND, Include lists imply combining them via OR?
+                predicate.append(inclusion ? " AND " : " OR ");
+            }
+        }
+        return predicate.toString();
+    }
+
+    /**
+     * The {@code REGEXP_LIKE} Oracle operator acts identical to the {@code LIKE} operator. Internally,
+     * it prepends and appends a "%" qualifier.  The include/exclude lists are meant to be explicit in
+     * that they have an implied "^" and "$" qualifier for start/end so that the LIKE operation does
+     * not mistakently filter "DEBEZIUM2" when using the reg-ex of "DEBEZIUM".
+     *
+     * @param pattern the pattern to be analyzed, should not be {@code null}
+     * @return the adjusted predicate, if necessary and doesn't already explicitly specify "^" or "$"
+     */
+    private static String resolveRegExpLikePattern(Pattern pattern) {
+        String text = pattern.pattern();
+        if (!text.startsWith("^")) {
+            text = "^" + text;
+        }
+        if (!text.endsWith("$")) {
+            text += "$";
+        }
+        return text;
+    }
+}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
new file mode 100644
index 000000000..868fa73b2
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer;
+
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.buildDataDictionary;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.checkSupplementalLogging;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.endMining;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getCurrentRedoLogFiles;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getEndScn;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getFirstOnlineLogScn;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getLastScnToAbandon;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getSystime;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.instantiateFlushConnections;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.setNlsSessionParameters;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.startLogMining;
+
+import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
+import io.debezium.connector.oracle.OracleConnection;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.connector.oracle.OracleDatabaseSchema;
+import io.debezium.connector.oracle.OracleOffsetContext;
+import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
+import io.debezium.connector.oracle.Scn;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import io.debezium.util.Stopwatch;
+import java.math.BigInteger;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility. The event handler loop
+ * is executed in a separate executor, and add method afterHandleScn.
+ * Copy from 'debezium-connector-oracle 1.6.4-Final' to fix cannot capture ddl changes made by connector user.
+ */
+public class LogMinerStreamingChangeEventSource
+        implements
+            StreamingChangeEventSource<OracleOffsetContext> {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class);
+
+    private final OracleConnection jdbcConnection;
+    private final EventDispatcher<TableId> dispatcher;
+    private final Clock clock;
+    private final OracleDatabaseSchema schema;
+    private final boolean isRac;
+    private final Set<String> racHosts = new HashSet<>();
+    private final JdbcConfiguration jdbcConfiguration;
+    private final OracleConnectorConfig.LogMiningStrategy strategy;
+    private final ErrorHandler errorHandler;
+    private final boolean isContinuousMining;
+    private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
+    private final OracleConnectorConfig connectorConfig;
+    private final Duration archiveLogRetention;
+    private final boolean archiveLogOnlyMode;
+    private final String archiveDestinationName;
+
+    private Scn startScn;
+    private Scn endScn;
+    private List<BigInteger> currentRedoLogSequences;
+
+    public LogMinerStreamingChangeEventSource(
+            OracleConnectorConfig connectorConfig,
+            OracleConnection jdbcConnection,
+            EventDispatcher<TableId> dispatcher,
+            ErrorHandler errorHandler,
+            Clock clock,
+            OracleDatabaseSchema schema,
+            Configuration jdbcConfig,
+            OracleStreamingChangeEventSourceMetrics streamingMetrics) {
+        this.jdbcConnection = jdbcConnection;
+        this.dispatcher = dispatcher;
+        this.clock = clock;
+        this.schema = schema;
+        this.connectorConfig = connectorConfig;
+        this.strategy = connectorConfig.getLogMiningStrategy();
+        this.isContinuousMining = connectorConfig.isContinuousMining();
+        this.errorHandler = errorHandler;
+        this.streamingMetrics = streamingMetrics;
+        this.jdbcConfiguration = JdbcConfiguration.adapt(jdbcConfig);
+        this.isRac = connectorConfig.isRacSystem();
+        if (this.isRac) {
+            this.racHosts.addAll(
+                    connectorConfig.getRacNodes().stream()
+                            .map(String::toUpperCase)
+                            .collect(Collectors.toSet()));
+            instantiateFlushConnections(jdbcConfiguration, racHosts);
+        }
+        this.archiveLogRetention = connectorConfig.getLogMiningArchiveLogRetention();
+        this.archiveLogOnlyMode = connectorConfig.isArchiveLogOnlyMode();
+        this.archiveDestinationName = connectorConfig.getLogMiningArchiveDestinationName();
+    }
+
+    /**
+     * This is the loop to get changes from LogMiner.
+     *
+     * @param context change event source context
+     */
+    @Override
+    public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) {
+        try (TransactionalBuffer transactionalBuffer =
+                new TransactionalBuffer(
+                        connectorConfig, schema, clock, errorHandler, streamingMetrics)) {
+            try {
+                startScn = offsetContext.getScn();
+
+                if (!isContinuousMining
+                        && startScn.compareTo(
+                                getFirstOnlineLogScn(
+                                        jdbcConnection,
+                                        archiveLogRetention,
+                                        archiveDestinationName)) < 0) {
+                    throw new DebeziumException(
+                            "Online REDO LOG files or archive log files do not contain the offset scn "
+                                    + startScn
+                                    + ".  Please perform a new snapshot.");
+                }
+
+                setNlsSessionParameters(jdbcConnection);
+                checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
+
+                if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
+                    return;
+                }
+
+                initializeRedoLogsForMining(jdbcConnection, false, startScn);
+
+                HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
+
+                try {
+                    // todo: why can't OracleConnection be used rather than a
+                    // Factory+JdbcConfiguration?
+                    historyRecorder.prepare(
+                            streamingMetrics,
+                            jdbcConfiguration,
+                            connectorConfig.getLogMinerHistoryRetentionHours());
+
+                    final LogMinerQueryResultProcessor processor =
+                            new LogMinerQueryResultProcessor(
+                                    context,
+                                    connectorConfig,
+                                    streamingMetrics,
+                                    transactionalBuffer,
+                                    offsetContext,
+                                    schema,
+                                    dispatcher,
+                                    historyRecorder);
+
+                    final String query =
+                            LogMinerQueryBuilder.build(connectorConfig, schema);
+                    try (PreparedStatement miningView =
+                            jdbcConnection
+                                    .connection()
+                                    .prepareStatement(
+                                            query,
+                                            ResultSet.TYPE_FORWARD_ONLY,
+                                            ResultSet.CONCUR_READ_ONLY,
+                                            ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
+
+                        currentRedoLogSequences = getCurrentRedoLogSequences();
+                        Stopwatch stopwatch = Stopwatch.reusable();
+                        while (context.isRunning()) {
+                            // Calculate time difference before each mining session to detect time
+                            // zone offset changes (e.g. DST) on database server
+                            streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));
+
+                            if (archiveLogOnlyMode
+                                    && !waitForStartScnInArchiveLogs(context, startScn)) {
+                                break;
+                            }
+
+                            Instant start = Instant.now();
+                            endScn =
+                                    getEndScn(
+                                            jdbcConnection,
+                                            startScn,
+                                            endScn,
+                                            streamingMetrics,
+                                            connectorConfig.getLogMiningBatchSizeDefault(),
+                                            connectorConfig.isLobEnabled(),
+                                            connectorConfig.isArchiveLogOnlyMode(),
+                                            connectorConfig.getLogMiningArchiveDestinationName());
+
+                            // This is a small window where when archive log only mode has
+                            // completely caught up to the last
+                            // record in the archive logs that both the start and end values are
+                            // identical. In this use
+                            // case we want to pause and restart the loop waiting for a new archive
+                            // log before proceeding.
+                            if (archiveLogOnlyMode && startScn.equals(endScn)) {
+                                pauseBetweenMiningSessions();
+                                continue;
+                            }
+
+                            if (hasLogSwitchOccurred()) {
+                                // This is the way to mitigate PGA leaks.
+                                // With one mining session, it grows and maybe there is another way
+                                // to flush PGA.
+                                // At this point we use a new mining session
+                                LOGGER.trace(
+                                        "Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
+                                        startScn,
+                                        endScn,
+                                        offsetContext.getScn(),
+                                        strategy,
+                                        isContinuousMining);
+                                endMining(jdbcConnection);
+
+                                initializeRedoLogsForMining(jdbcConnection, true, startScn);
+
+                                abandonOldTransactionsIfExist(
+                                        jdbcConnection, offsetContext, transactionalBuffer);
+
+                                // This needs to be re-calculated because building the data
+                                // dictionary will force the
+                                // current redo log sequence to be advanced due to a complete log
+                                // switch of all logs.
+                                currentRedoLogSequences = getCurrentRedoLogSequences();
+                            }
+
+                            startLogMining(
+                                    jdbcConnection,
+                                    startScn,
+                                    endScn,
+                                    strategy,
+                                    isContinuousMining,
+                                    streamingMetrics);
+
+                            LOGGER.trace(
+                                    "Fetching LogMiner view results SCN {} to {}",
+                                    startScn,
+                                    endScn);
+                            stopwatch.start();
+                            miningView.setFetchSize(connectorConfig.getMaxQueueSize());
+                            miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
+                            miningView.setString(1, startScn.toString());
+                            miningView.setString(2, endScn.toString());
+                            try (ResultSet rs = miningView.executeQuery()) {
+                                Duration lastDurationOfBatchCapturing =
+                                        stopwatch.stop().durations().statistics().getTotal();
+                                streamingMetrics.setLastDurationOfBatchCapturing(
+                                        lastDurationOfBatchCapturing);
+                                processor.processResult(rs);
+                                if (connectorConfig.isLobEnabled()) {
+                                    startScn =
+                                            transactionalBuffer.updateOffsetContext(
+                                                    offsetContext, dispatcher);
+                                } else {
+
+                                    final Scn lastProcessedScn = processor.getLastProcessedScn();
+                                    if (!lastProcessedScn.isNull()
+                                            && lastProcessedScn.compareTo(endScn) < 0) {
+                                        // If the last processed SCN is before the endScn we need to
+                                        // use the last processed SCN as the
+                                        // next starting point as the LGWR buffer didn't flush all
+                                        // entries from memory to disk yet.
+                                        endScn = lastProcessedScn;
+                                    }
+
+                                    if (transactionalBuffer.isEmpty()) {
+                                        LOGGER.debug(
+                                                "Buffer is empty, updating offset SCN to {}",
+                                                endScn);
+                                        offsetContext.setScn(endScn);
+                                    } else {
+                                        final Scn minStartScn = transactionalBuffer.getMinimumScn();
+                                        if (!minStartScn.isNull()) {
+                                            offsetContext.setScn(
+                                                    minStartScn.subtract(Scn.valueOf(1)));
+                                            dispatcher.dispatchHeartbeatEvent(offsetContext);
+                                        }
+                                    }
+                                    startScn = endScn;
+                                }
+                            }
+
+                            afterHandleScn(offsetContext);
+                            streamingMetrics.setCurrentBatchProcessingTime(
+                                    Duration.between(start, Instant.now()));
+                            pauseBetweenMiningSessions();
+                        }
+                    }
+                } finally {
+                    historyRecorder.close();
+                }
+            } catch (Throwable t) {
+                logError(streamingMetrics, "Mining session stopped due to the {}", t);
+                errorHandler.setProducerThrowable(t);
+            } finally {
+                LOGGER.info(
+                        "startScn={}, endScn={}, offsetContext.getScn()={}",
+                        startScn,
+                        endScn,
+                        offsetContext.getScn());
+                LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
+                LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
+            }
+        }
+    }
+
+    protected void afterHandleScn(OracleOffsetContext offsetContext) {
+    }
+
+    private void abandonOldTransactionsIfExist(
+            OracleConnection connection,
+            OracleOffsetContext offsetContext,
+            TransactionalBuffer transactionalBuffer) {
+        Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention();
+        if (!Duration.ZERO.equals(transactionRetention)) {
+            final Scn offsetScn = offsetContext.getScn();
+            Optional<Scn> lastScnToAbandonTransactions =
+                    getLastScnToAbandon(connection, offsetScn, transactionRetention);
+            lastScnToAbandonTransactions.ifPresent(
+                    thresholdScn -> {
+                        transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);
+                        offsetContext.setScn(thresholdScn);
+                        startScn = endScn;
+                    });
+        }
+    }
+
+    private void initializeRedoLogsForMining(
+            OracleConnection connection, boolean postEndMiningSession, Scn startScn)
+            throws SQLException {
+        if (!postEndMiningSession) {
+            if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
+                buildDataDictionary(connection);
+            }
+            if (!isContinuousMining) {
+                setLogFilesForMining(
+                        connection,
+                        startScn,
+                        archiveLogRetention,
+                        archiveLogOnlyMode,
+                        archiveDestinationName);
+            }
+        } else {
+            if (!isContinuousMining) {
+                if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
+                    buildDataDictionary(connection);
+                }
+                setLogFilesForMining(
+                        connection,
+                        startScn,
+                        archiveLogRetention,
+                        archiveLogOnlyMode,
+                        archiveDestinationName);
+            }
+        }
+    }
+
+    /**
+     * Checks whether a database log switch has occurred and updates metrics if so.
+     *
+     * @return {@code true} if a log switch was detected, otherwise {@code false}
+     * @throws SQLException if a database exception occurred
+     */
+    private boolean hasLogSwitchOccurred() throws SQLException {
+        final List<BigInteger> newSequences = getCurrentRedoLogSequences();
+        if (!newSequences.equals(currentRedoLogSequences)) {
+            LOGGER.debug(
+                    "Current log sequence(s) is now {}, was {}",
+                    newSequences,
+                    currentRedoLogSequences);
+
+            currentRedoLogSequences = newSequences;
+
+            final Map<String, String> logStatuses =
+                    jdbcConnection.queryAndMap(
+                            SqlUtils.redoLogStatusQuery(),
+                            rs -> {
+                                Map<String, String> results = new LinkedHashMap<>();
+                                while (rs.next()) {
+                                    results.put(rs.getString(1), rs.getString(2));
+                                }
+                                return results;
+                            });
+
+            final int logSwitchCount =
+                    jdbcConnection.queryAndMap(
+                            SqlUtils.switchHistoryQuery(archiveDestinationName),
+                            rs -> {
+                                if (rs.next()) {
+                                    return rs.getInt(2);
+                                }
+                                return 0;
+                            });
+
+            final Set<String> fileNames = getCurrentRedoLogFiles(jdbcConnection);
+
+            streamingMetrics.setRedoLogStatus(logStatuses);
+            streamingMetrics.setSwitchCount(logSwitchCount);
+            streamingMetrics.setCurrentLogFileName(fileNames);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Get the current redo log sequence(s).
+     *
+     * <p>In an Oracle RAC environment, there are multiple current redo logs and therefore this
+     * method returns multiple values, each relating to a single RAC node in the Oracle cluster.
+     *
+     * @return list of sequence numbers
+     * @throws SQLException if a database exception occurred
+     */
+    private List<BigInteger> getCurrentRedoLogSequences() throws SQLException {
+        return jdbcConnection.queryAndMap(
+                SqlUtils.currentRedoLogSequenceQuery(),
+                rs -> {
+                    List<BigInteger> sequences = new ArrayList<>();
+                    while (rs.next()) {
+                        sequences.add(new BigInteger(rs.getString(1)));
+                    }
+                    return sequences;
+                });
+    }
+
+    private void pauseBetweenMiningSessions() throws InterruptedException {
+        Duration period =
+                Duration.ofMillis(streamingMetrics.getMillisecondToSleepBetweenMiningQuery());
+        Metronome.sleeper(period, clock).pause();
+    }
+
+    /**
+     * Waits for the starting system change number to exist in the archive logs before returning.
+     *
+     * @param context the change event source context
+     * @param startScn the starting system change number
+     * @return true if the code should continue, false if the code should end.
+     * @throws SQLException if a database exception occurred
+     * @throws InterruptedException if the pause between checks is interrupted
+     */
+    private boolean waitForStartScnInArchiveLogs(ChangeEventSourceContext context, Scn startScn)
+            throws SQLException, InterruptedException {
+        boolean showStartScnNotInArchiveLogs = true;
+        while (context.isRunning() && !isStartScnInArchiveLogs(startScn)) {
+            if (showStartScnNotInArchiveLogs) {
+                LOGGER.warn(
+                        "Starting SCN {} is not yet in archive logs, waiting for archive log switch.",
+                        startScn);
+                showStartScnNotInArchiveLogs = false;
+                Metronome.sleeper(connectorConfig.getArchiveLogOnlyScnPollTime(), clock).pause();
+            }
+        }
+
+        if (!context.isRunning()) {
+            return false;
+        }
+
+        if (!showStartScnNotInArchiveLogs) {
+            LOGGER.info(
+                    "Starting SCN {} is now available in archive logs, log mining unpaused.",
+                    startScn);
+        }
+        return true;
+    }
+
+    /**
+     * Returns whether the starting system change number is in the archive logs.
+     *
+     * @param startScn the starting system change number
+     * @return true if the starting system change number is in the archive logs; false otherwise.
+     * @throws SQLException if a database exception occurred
+     */
+    private boolean isStartScnInArchiveLogs(Scn startScn) throws SQLException {
+        List<LogFile> logs =
+                LogMinerHelper.getLogFilesForOffsetScn(
+                        jdbcConnection,
+                        startScn,
+                        archiveLogRetention,
+                        archiveLogOnlyMode,
+                        archiveDestinationName);
+        return logs.stream()
+                .anyMatch(
+                        l -> l.getFirstScn().compareTo(startScn) <= 0
+                                && l.getNextScn().compareTo(startScn) > 0
+                                && l.getType().equals(LogFile.Type.ARCHIVE));
+    }
+
+    @Override
+    public void commitOffset(Map<String, ?> offset) {
+        // nothing to do
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 40ab5780a..44bb81060 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -710,6 +710,12 @@
  Source  : flink-cdc-connectors 2.3.0 (Please note that the software have been modified.)
  License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
+ 1.3.15 inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java
+        inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+
+ Source  : debezium-connector-oracle 1.6.4.Final (Please note that the software have been modified.)
+ License : https://github.com/debezium/debezium/blob/main/LICENSE.txt
+
 =======================================================================
 Apache InLong Subcomponents: