You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/26 01:35:47 UTC
[inlong] branch master updated: [INLONG-7909][Sort] Fix Oracle CDC cannot capture ddl changes (#7917)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0b70a1591 [INLONG-7909][Sort] Fix Oracle CDC cannot capture ddl changes (#7917)
0b70a1591 is described below
commit 0b70a1591a09cf098c679af51080ddce27148b81
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Wed Apr 26 09:35:41 2023 +0800
[INLONG-7909][Sort] Fix Oracle CDC cannot capture ddl changes (#7917)
---
inlong-sort/sort-connectors/oracle-cdc/pom.xml | 1 +
.../oracle/logminer/LogMinerQueryBuilder.java | 241 ++++++++++
.../LogMinerStreamingChangeEventSource.java | 525 +++++++++++++++++++++
licenses/inlong-sort-connectors/LICENSE | 6 +
4 files changed, 773 insertions(+)
diff --git a/inlong-sort/sort-connectors/oracle-cdc/pom.xml b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
index f097196a4..5c3de2611 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
@@ -125,6 +125,7 @@
<artifact>org.apache.inlong:sort-connector-*</artifact>
<includes>
<include>org/apache/inlong/**</include>
+ <include>io/debezium/connector/oracle/**</include>
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
</includes>
</filter>
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java
new file mode 100644
index 000000000..6cff235fd
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer;
+
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.connector.oracle.OracleDatabaseSchema;
+import io.debezium.util.Strings;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * A builder that is responsible for producing the query to be executed against the LogMiner view.
+ *
+ * @author Chris Cranford
+ * Copy from 'debezium-connector-oracle 1.6.4-Final' to fix cannot capture ddl changes made by connector user.
+ */
+public class LogMinerQueryBuilder {
+
+ private static final String LOGMNR_CONTENTS_VIEW = "V$LOGMNR_CONTENTS";
+
+ /**
+ * Builds the LogMiner contents view query.
+ *
+ * The returned query will contain 2 bind parameters that the caller is responsible for binding before
+ * executing the query. The first bind parameter is the lower-bounds of the SCN mining window that is
+ * not-inclusive while the second is the upper-bounds of the SCN mining window that is inclusive.
+ *
+ * The built query relies on the following columns from V$LOGMNR_CONTENTS:
+ * <pre>
+ * SCN - the system change number at which the change was made
+ * SQL_REDO - the reconstructed SQL statement that initiated the change
+ * OPERATION - the database operation type name
+ * OPERATION_CODE - the database operation numeric code
+ * TIMESTAMP - the time when the change event occurred
+ * XID - the transaction identifier the change participated in
+ * CSF - the continuation flag, identifies rows that should be processed together as single row, 0=no, 1=yes
+ * TABLE_NAME - the name of the table for which the change is for
+ * SEG_OWNER - the name of the schema for which the change is for
+ * USERNAME - the name of the database user that caused the change
+ * ROW_ID - the unique identifier of the row that the change is for, may not always be set with valid value
+ * ROLLBACK - the rollback flag, value of 0 or 1. 1 implies the row was rolled back
+ * RS_ID - the rollback segment idenifier where the change record was record from
+ * </pre>
+ *
+ * @param connectorConfig connector configuration, should not be {@code null}
+ * @param schema database schema, should not be {@code null}
+ * @return the SQL string to be used to fetch changes from Oracle LogMiner
+ */
+ public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
+ final StringBuilder query = new StringBuilder(1024);
+ query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, ");
+ query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID ");
+ query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");
+
+ // These bind parameters will be bound when the query is executed by the caller.
+ query.append("WHERE SCN > ? AND SCN <= ? ");
+
+ // Restrict to configured PDB if one is supplied
+ final String pdbName = connectorConfig.getPdbName();
+ if (!Strings.isNullOrEmpty(pdbName)) {
+ query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' ");
+ }
+
+ query.append("AND (");
+
+ // Always include START, COMMIT, MISSING_SCN, and ROLLBACK operations
+ query.append("(OPERATION_CODE IN (6,7,34,36)");
+
+ if (!schema.storeOnlyCapturedTables()) {
+ // In this mode, the connector will always be fed DDL operations for all tables even if they
+ // are not part of the inclusion/exclusion lists.
+ query.append(" OR ").append(buildDdlPredicate()).append(" ");
+ // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
+ if (connectorConfig.isLobEnabled()) {
+ query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) ");
+ } else {
+ query.append(") OR (OPERATION_CODE IN (1,2,3) ");
+ }
+ } else {
+ // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
+ if (connectorConfig.isLobEnabled()) {
+ query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) ");
+ } else {
+ query.append(") OR ((OPERATION_CODE IN (1,2,3) ");
+ }
+ // In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists
+ query.append("OR ").append(buildDdlPredicate()).append(") ");
+ }
+
+ // Always ignore the flush table
+ query.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' ");
+
+ // There are some common schemas that we automatically ignore when building the runtime Filter
+ // predicates and we put that same list of schemas here and apply those in the generated SQL.
+ if (!OracleConnectorConfig.EXCLUDED_SCHEMAS.isEmpty()) {
+ query.append("AND SEG_OWNER NOT IN (");
+ for (Iterator<String> i = OracleConnectorConfig.EXCLUDED_SCHEMAS.iterator(); i.hasNext();) {
+ String excludedSchema = i.next();
+ query.append("'").append(excludedSchema.toUpperCase()).append("'");
+ if (i.hasNext()) {
+ query.append(",");
+ }
+ }
+ query.append(") ");
+ }
+
+ String schemaPredicate = buildSchemaPredicate(connectorConfig);
+ if (!Strings.isNullOrEmpty(schemaPredicate)) {
+ query.append("AND ").append(schemaPredicate).append(" ");
+ }
+
+ String tablePredicate = buildTablePredicate(connectorConfig);
+ if (!Strings.isNullOrEmpty(tablePredicate)) {
+ query.append("AND ").append(tablePredicate).append(" ");
+ }
+
+ query.append("))");
+
+ return query.toString();
+ }
+
+ /**
+ * Builds a common SQL fragment used to obtain DDL operations via LogMiner.
+ *
+ * @return predicate that can be used to obtain DDL operations via LogMiner
+ */
+ private static String buildDdlPredicate() {
+ final StringBuilder predicate = new StringBuilder(256);
+ predicate.append("(OPERATION_CODE = 5 ");
+ predicate.append("AND USERNAME NOT IN ('SYS','SYSTEM') ");
+ predicate.append("AND INFO NOT LIKE 'INTERNAL DDL%' ");
+ predicate.append("AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))");
+ return predicate.toString();
+ }
+
+ /**
+ * Builds a SQL predicate of what schemas to include/exclude based on the connector configuration.
+ *
+ * @param connectorConfig connector configuration, should not be {@code null}
+ * @return SQL predicate to filter results based on schema include/exclude configurations
+ */
+ private static String buildSchemaPredicate(OracleConnectorConfig connectorConfig) {
+ StringBuilder predicate = new StringBuilder();
+ if (Strings.isNullOrEmpty(connectorConfig.schemaIncludeList())) {
+ if (!Strings.isNullOrEmpty(connectorConfig.schemaExcludeList())) {
+ List<Pattern> patterns = Strings.listOfRegex(connectorConfig.schemaExcludeList(), 0);
+ predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", true)).append(")");
+ }
+ } else {
+ List<Pattern> patterns = Strings.listOfRegex(connectorConfig.schemaIncludeList(), 0);
+ predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", false)).append(")");
+ }
+ return predicate.toString();
+ }
+
+ /**
+ * Builds a SQL predicate of what tables to include/exclude based on the connector configuration.
+ *
+ * @param connectorConfig connector configuration, should not be {@code null}
+ * @return SQL predicate to filter results based on table include/exclude configuration
+ */
+ private static String buildTablePredicate(OracleConnectorConfig connectorConfig) {
+ StringBuilder predicate = new StringBuilder();
+ if (Strings.isNullOrEmpty(connectorConfig.tableIncludeList())) {
+ if (!Strings.isNullOrEmpty(connectorConfig.tableExcludeList())) {
+ List<Pattern> patterns = Strings.listOfRegex(connectorConfig.tableExcludeList(), 0);
+ predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", true))
+ .append(")");
+ }
+ } else {
+ List<Pattern> patterns = Strings.listOfRegex(connectorConfig.tableIncludeList(), 0);
+ predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", false))
+ .append(")");
+ }
+ return predicate.toString();
+ }
+
+ /**
+ * Takes a list of reg-ex patterns and builds an Oracle-specific predicate using {@code REGEXP_LIKE}
+ * in order to take the connector configuration include/exclude lists and assemble them as SQL
+ * predicates.
+ *
+ * @param patterns list of each individual include/exclude reg-ex patterns from connector configuration
+ * @param columnName the column in which the reg-ex patterns are to be applied against
+ * @param inclusion should be {@code true} when passing inclusion patterns, {@code false} otherwise
+ * @return
+ */
+ private static String listOfPatternsToSql(List<Pattern> patterns, String columnName, boolean inclusion) {
+ StringBuilder predicate = new StringBuilder();
+ for (Iterator<Pattern> i = patterns.iterator(); i.hasNext();) {
+ Pattern pattern = i.next();
+ if (inclusion) {
+ predicate.append("NOT ");
+ }
+ // NOTE: The REGEXP_LIKE operator was added in Oracle 10g (10.1.0.0.0)
+ final String text = resolveRegExpLikePattern(pattern);
+ predicate.append("REGEXP_LIKE(").append(columnName).append(",'").append(text).append("','i')");
+ if (i.hasNext()) {
+ // Exclude lists imply combining them via AND, Include lists imply combining them via OR?
+ predicate.append(inclusion ? " AND " : " OR ");
+ }
+ }
+ return predicate.toString();
+ }
+
+ /**
+ * The {@code REGEXP_LIKE} Oracle operator acts identical to the {@code LIKE} operator. Internally,
+ * it prepends and appends a "%" qualifier. The include/exclude lists are meant to be explicit in
+ * that they have an implied "^" and "$" qualifier for start/end so that the LIKE operation does
+ * not mistakently filter "DEBEZIUM2" when using the reg-ex of "DEBEZIUM".
+ *
+ * @param pattern the pattern to be analyzed, should not be {@code null}
+ * @return the adjusted predicate, if necessary and doesn't already explicitly specify "^" or "$"
+ */
+ private static String resolveRegExpLikePattern(Pattern pattern) {
+ String text = pattern.pattern();
+ if (!text.startsWith("^")) {
+ text = "^" + text;
+ }
+ if (!text.endsWith("$")) {
+ text += "$";
+ }
+ return text;
+ }
+}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
new file mode 100644
index 000000000..868fa73b2
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer;
+
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.buildDataDictionary;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.checkSupplementalLogging;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.endMining;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getCurrentRedoLogFiles;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getEndScn;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getFirstOnlineLogScn;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getLastScnToAbandon;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.getSystime;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.instantiateFlushConnections;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.setNlsSessionParameters;
+import static io.debezium.connector.oracle.logminer.LogMinerHelper.startLogMining;
+
+import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
+import io.debezium.connector.oracle.OracleConnection;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.connector.oracle.OracleDatabaseSchema;
+import io.debezium.connector.oracle.OracleOffsetContext;
+import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
+import io.debezium.connector.oracle.Scn;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import io.debezium.util.Stopwatch;
+import java.math.BigInteger;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility. The event handler loop
+ * is executed in a separate executor, and add method afterHandleScn.
+ * Copy from 'debezium-connector-oracle 1.6.4-Final' to fix cannot capture ddl changes made by connector user.
+ */
+public class LogMinerStreamingChangeEventSource
+ implements
+ StreamingChangeEventSource<OracleOffsetContext> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class);
+
+ private final OracleConnection jdbcConnection;
+ private final EventDispatcher<TableId> dispatcher;
+ private final Clock clock;
+ private final OracleDatabaseSchema schema;
+ private final boolean isRac;
+ private final Set<String> racHosts = new HashSet<>();
+ private final JdbcConfiguration jdbcConfiguration;
+ private final OracleConnectorConfig.LogMiningStrategy strategy;
+ private final ErrorHandler errorHandler;
+ private final boolean isContinuousMining;
+ private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
+ private final OracleConnectorConfig connectorConfig;
+ private final Duration archiveLogRetention;
+ private final boolean archiveLogOnlyMode;
+ private final String archiveDestinationName;
+
+ private Scn startScn;
+ private Scn endScn;
+ private List<BigInteger> currentRedoLogSequences;
+
+ public LogMinerStreamingChangeEventSource(
+ OracleConnectorConfig connectorConfig,
+ OracleConnection jdbcConnection,
+ EventDispatcher<TableId> dispatcher,
+ ErrorHandler errorHandler,
+ Clock clock,
+ OracleDatabaseSchema schema,
+ Configuration jdbcConfig,
+ OracleStreamingChangeEventSourceMetrics streamingMetrics) {
+ this.jdbcConnection = jdbcConnection;
+ this.dispatcher = dispatcher;
+ this.clock = clock;
+ this.schema = schema;
+ this.connectorConfig = connectorConfig;
+ this.strategy = connectorConfig.getLogMiningStrategy();
+ this.isContinuousMining = connectorConfig.isContinuousMining();
+ this.errorHandler = errorHandler;
+ this.streamingMetrics = streamingMetrics;
+ this.jdbcConfiguration = JdbcConfiguration.adapt(jdbcConfig);
+ this.isRac = connectorConfig.isRacSystem();
+ if (this.isRac) {
+ this.racHosts.addAll(
+ connectorConfig.getRacNodes().stream()
+ .map(String::toUpperCase)
+ .collect(Collectors.toSet()));
+ instantiateFlushConnections(jdbcConfiguration, racHosts);
+ }
+ this.archiveLogRetention = connectorConfig.getLogMiningArchiveLogRetention();
+ this.archiveLogOnlyMode = connectorConfig.isArchiveLogOnlyMode();
+ this.archiveDestinationName = connectorConfig.getLogMiningArchiveDestinationName();
+ }
+
+ /**
+ * This is the loop to get changes from LogMiner.
+ *
+ * @param context change event source context
+ */
+ @Override
+ public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) {
+ try (TransactionalBuffer transactionalBuffer =
+ new TransactionalBuffer(
+ connectorConfig, schema, clock, errorHandler, streamingMetrics)) {
+ try {
+ startScn = offsetContext.getScn();
+
+ if (!isContinuousMining
+ && startScn.compareTo(
+ getFirstOnlineLogScn(
+ jdbcConnection,
+ archiveLogRetention,
+ archiveDestinationName)) < 0) {
+ throw new DebeziumException(
+ "Online REDO LOG files or archive log files do not contain the offset scn "
+ + startScn
+ + ". Please perform a new snapshot.");
+ }
+
+ setNlsSessionParameters(jdbcConnection);
+ checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
+
+ if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
+ return;
+ }
+
+ initializeRedoLogsForMining(jdbcConnection, false, startScn);
+
+ HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
+
+ try {
+ // todo: why can't OracleConnection be used rather than a
+ // Factory+JdbcConfiguration?
+ historyRecorder.prepare(
+ streamingMetrics,
+ jdbcConfiguration,
+ connectorConfig.getLogMinerHistoryRetentionHours());
+
+ final LogMinerQueryResultProcessor processor =
+ new LogMinerQueryResultProcessor(
+ context,
+ connectorConfig,
+ streamingMetrics,
+ transactionalBuffer,
+ offsetContext,
+ schema,
+ dispatcher,
+ historyRecorder);
+
+ final String query =
+ LogMinerQueryBuilder.build(connectorConfig, schema);
+ try (PreparedStatement miningView =
+ jdbcConnection
+ .connection()
+ .prepareStatement(
+ query,
+ ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY,
+ ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
+
+ currentRedoLogSequences = getCurrentRedoLogSequences();
+ Stopwatch stopwatch = Stopwatch.reusable();
+ while (context.isRunning()) {
+ // Calculate time difference before each mining session to detect time
+ // zone offset changes (e.g. DST) on database server
+ streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));
+
+ if (archiveLogOnlyMode
+ && !waitForStartScnInArchiveLogs(context, startScn)) {
+ break;
+ }
+
+ Instant start = Instant.now();
+ endScn =
+ getEndScn(
+ jdbcConnection,
+ startScn,
+ endScn,
+ streamingMetrics,
+ connectorConfig.getLogMiningBatchSizeDefault(),
+ connectorConfig.isLobEnabled(),
+ connectorConfig.isArchiveLogOnlyMode(),
+ connectorConfig.getLogMiningArchiveDestinationName());
+
+ // This is a small window where when archive log only mode has
+ // completely caught up to the last
+ // record in the archive logs that both the start and end values are
+ // identical. In this use
+ // case we want to pause and restart the loop waiting for a new archive
+ // log before proceeding.
+ if (archiveLogOnlyMode && startScn.equals(endScn)) {
+ pauseBetweenMiningSessions();
+ continue;
+ }
+
+ if (hasLogSwitchOccurred()) {
+ // This is the way to mitigate PGA leaks.
+ // With one mining session, it grows and maybe there is another way
+ // to flush PGA.
+ // At this point we use a new mining session
+ LOGGER.trace(
+ "Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
+ startScn,
+ endScn,
+ offsetContext.getScn(),
+ strategy,
+ isContinuousMining);
+ endMining(jdbcConnection);
+
+ initializeRedoLogsForMining(jdbcConnection, true, startScn);
+
+ abandonOldTransactionsIfExist(
+ jdbcConnection, offsetContext, transactionalBuffer);
+
+ // This needs to be re-calculated because building the data
+ // dictionary will force the
+ // current redo log sequence to be advanced due to a complete log
+ // switch of all logs.
+ currentRedoLogSequences = getCurrentRedoLogSequences();
+ }
+
+ startLogMining(
+ jdbcConnection,
+ startScn,
+ endScn,
+ strategy,
+ isContinuousMining,
+ streamingMetrics);
+
+ LOGGER.trace(
+ "Fetching LogMiner view results SCN {} to {}",
+ startScn,
+ endScn);
+ stopwatch.start();
+ miningView.setFetchSize(connectorConfig.getMaxQueueSize());
+ miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
+ miningView.setString(1, startScn.toString());
+ miningView.setString(2, endScn.toString());
+ try (ResultSet rs = miningView.executeQuery()) {
+ Duration lastDurationOfBatchCapturing =
+ stopwatch.stop().durations().statistics().getTotal();
+ streamingMetrics.setLastDurationOfBatchCapturing(
+ lastDurationOfBatchCapturing);
+ processor.processResult(rs);
+ if (connectorConfig.isLobEnabled()) {
+ startScn =
+ transactionalBuffer.updateOffsetContext(
+ offsetContext, dispatcher);
+ } else {
+
+ final Scn lastProcessedScn = processor.getLastProcessedScn();
+ if (!lastProcessedScn.isNull()
+ && lastProcessedScn.compareTo(endScn) < 0) {
+ // If the last processed SCN is before the endScn we need to
+ // use the last processed SCN as the
+ // next starting point as the LGWR buffer didn't flush all
+ // entries from memory to disk yet.
+ endScn = lastProcessedScn;
+ }
+
+ if (transactionalBuffer.isEmpty()) {
+ LOGGER.debug(
+ "Buffer is empty, updating offset SCN to {}",
+ endScn);
+ offsetContext.setScn(endScn);
+ } else {
+ final Scn minStartScn = transactionalBuffer.getMinimumScn();
+ if (!minStartScn.isNull()) {
+ offsetContext.setScn(
+ minStartScn.subtract(Scn.valueOf(1)));
+ dispatcher.dispatchHeartbeatEvent(offsetContext);
+ }
+ }
+ startScn = endScn;
+ }
+ }
+
+ afterHandleScn(offsetContext);
+ streamingMetrics.setCurrentBatchProcessingTime(
+ Duration.between(start, Instant.now()));
+ pauseBetweenMiningSessions();
+ }
+ }
+ } finally {
+ historyRecorder.close();
+ }
+ } catch (Throwable t) {
+ logError(streamingMetrics, "Mining session stopped due to the {}", t);
+ errorHandler.setProducerThrowable(t);
+ } finally {
+ LOGGER.info(
+ "startScn={}, endScn={}, offsetContext.getScn()={}",
+ startScn,
+ endScn,
+ offsetContext.getScn());
+ LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
+ LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
+ }
+ }
+ }
+
+ protected void afterHandleScn(OracleOffsetContext offsetContext) {
+ }
+
+ private void abandonOldTransactionsIfExist(
+ OracleConnection connection,
+ OracleOffsetContext offsetContext,
+ TransactionalBuffer transactionalBuffer) {
+ Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention();
+ if (!Duration.ZERO.equals(transactionRetention)) {
+ final Scn offsetScn = offsetContext.getScn();
+ Optional<Scn> lastScnToAbandonTransactions =
+ getLastScnToAbandon(connection, offsetScn, transactionRetention);
+ lastScnToAbandonTransactions.ifPresent(
+ thresholdScn -> {
+ transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);
+ offsetContext.setScn(thresholdScn);
+ startScn = endScn;
+ });
+ }
+ }
+
+ private void initializeRedoLogsForMining(
+ OracleConnection connection, boolean postEndMiningSession, Scn startScn)
+ throws SQLException {
+ if (!postEndMiningSession) {
+ if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
+ buildDataDictionary(connection);
+ }
+ if (!isContinuousMining) {
+ setLogFilesForMining(
+ connection,
+ startScn,
+ archiveLogRetention,
+ archiveLogOnlyMode,
+ archiveDestinationName);
+ }
+ } else {
+ if (!isContinuousMining) {
+ if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
+ buildDataDictionary(connection);
+ }
+ setLogFilesForMining(
+ connection,
+ startScn,
+ archiveLogRetention,
+ archiveLogOnlyMode,
+ archiveDestinationName);
+ }
+ }
+ }
+
+ /**
+ * Checks whether a database log switch has occurred and updates metrics if so.
+ *
+ * @return {@code true} if a log switch was detected, otherwise {@code false}
+ * @throws SQLException if a database exception occurred
+ */
+ private boolean hasLogSwitchOccurred() throws SQLException {
+ final List<BigInteger> newSequences = getCurrentRedoLogSequences();
+ if (!newSequences.equals(currentRedoLogSequences)) {
+ LOGGER.debug(
+ "Current log sequence(s) is now {}, was {}",
+ newSequences,
+ currentRedoLogSequences);
+
+ currentRedoLogSequences = newSequences;
+
+ final Map<String, String> logStatuses =
+ jdbcConnection.queryAndMap(
+ SqlUtils.redoLogStatusQuery(),
+ rs -> {
+ Map<String, String> results = new LinkedHashMap<>();
+ while (rs.next()) {
+ results.put(rs.getString(1), rs.getString(2));
+ }
+ return results;
+ });
+
+ final int logSwitchCount =
+ jdbcConnection.queryAndMap(
+ SqlUtils.switchHistoryQuery(archiveDestinationName),
+ rs -> {
+ if (rs.next()) {
+ return rs.getInt(2);
+ }
+ return 0;
+ });
+
+ final Set<String> fileNames = getCurrentRedoLogFiles(jdbcConnection);
+
+ streamingMetrics.setRedoLogStatus(logStatuses);
+ streamingMetrics.setSwitchCount(logSwitchCount);
+ streamingMetrics.setCurrentLogFileName(fileNames);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Get the current redo log sequence(s).
+ *
+ * <p>In an Oracle RAC environment, there are multiple current redo logs and therefore this
+ * method returns multiple values, each relating to a single RAC node in the Oracle cluster.
+ *
+ * @return list of sequence numbers
+ * @throws SQLException if a database exception occurred
+ */
+ private List<BigInteger> getCurrentRedoLogSequences() throws SQLException {
+ return jdbcConnection.queryAndMap(
+ SqlUtils.currentRedoLogSequenceQuery(),
+ rs -> {
+ List<BigInteger> sequences = new ArrayList<>();
+ while (rs.next()) {
+ sequences.add(new BigInteger(rs.getString(1)));
+ }
+ return sequences;
+ });
+ }
+
+ private void pauseBetweenMiningSessions() throws InterruptedException {
+ Duration period =
+ Duration.ofMillis(streamingMetrics.getMillisecondToSleepBetweenMiningQuery());
+ Metronome.sleeper(period, clock).pause();
+ }
+
+ /**
+ * Waits for the starting system change number to exist in the archive logs before returning.
+ *
+ * @param context the change event source context
+ * @param startScn the starting system change number
+ * @return true if the code should continue, false if the code should end.
+ * @throws SQLException if a database exception occurred
+ * @throws InterruptedException if the pause between checks is interrupted
+ */
+ private boolean waitForStartScnInArchiveLogs(ChangeEventSourceContext context, Scn startScn)
+ throws SQLException, InterruptedException {
+ boolean showStartScnNotInArchiveLogs = true;
+ while (context.isRunning() && !isStartScnInArchiveLogs(startScn)) {
+ if (showStartScnNotInArchiveLogs) {
+ LOGGER.warn(
+ "Starting SCN {} is not yet in archive logs, waiting for archive log switch.",
+ startScn);
+ showStartScnNotInArchiveLogs = false;
+ Metronome.sleeper(connectorConfig.getArchiveLogOnlyScnPollTime(), clock).pause();
+ }
+ }
+
+ if (!context.isRunning()) {
+ return false;
+ }
+
+ if (!showStartScnNotInArchiveLogs) {
+ LOGGER.info(
+ "Starting SCN {} is now available in archive logs, log mining unpaused.",
+ startScn);
+ }
+ return true;
+ }
+
+ /**
+ * Returns whether the starting system change number is in the archive logs.
+ *
+ * @param startScn the starting system change number
+ * @return true if the starting system change number is in the archive logs; false otherwise.
+ * @throws SQLException if a database exception occurred
+ */
+ private boolean isStartScnInArchiveLogs(Scn startScn) throws SQLException {
+ List<LogFile> logs =
+ LogMinerHelper.getLogFilesForOffsetScn(
+ jdbcConnection,
+ startScn,
+ archiveLogRetention,
+ archiveLogOnlyMode,
+ archiveDestinationName);
+ return logs.stream()
+ .anyMatch(
+ l -> l.getFirstScn().compareTo(startScn) <= 0
+ && l.getNextScn().compareTo(startScn) > 0
+ && l.getType().equals(LogFile.Type.ARCHIVE));
+ }
+
+ @Override
+ public void commitOffset(Map<String, ?> offset) {
+ // nothing to do
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 40ab5780a..44bb81060 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -710,6 +710,12 @@
Source : flink-cdc-connectors 2.3.0 (Please note that the software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+ 1.3.15 inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java
+ inlong-sort/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+
+ Source : debezium-connector-oracle 1.6.4.Final (Please note that the software have been modified.)
+ License : https://github.com/debezium/debezium/blob/main/LICENSE.txt
+
=======================================================================
Apache InLong Subcomponents: