You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/01 09:17:36 UTC
[inlong] branch master updated: [INLONG-5242][Sort] Add metric computing for mongoDB and Oracle (#5298)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9038879f7 [INLONG-5242][Sort] Add metric computing for mongoDB and Oracle (#5298)
9038879f7 is described below
commit 9038879f7eb2f7e7f5f4f16cfc397c9fadd304d1
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Aug 1 17:17:32 2022 +0800
[INLONG-5242][Sort] Add metric computing for mongoDB and Oracle (#5298)
---
.../org/apache/inlong/sort/base/Constants.java | 5 +
.../inlong/sort/base/metric/SinkMetricData.java | 90 ++++-
.../inlong/sort/base/metric/SourceMetricData.java | 55 ++-
inlong-sort/sort-connectors/mongodb-cdc/pom.xml | 6 +
.../sort/cdc/mongodb}/DebeziumSourceFunction.java | 55 +--
.../inlong/sort/cdc/mongodb/MongoDBSource.java | 447 +++++++++++++++++++++
.../sort/cdc/mongodb/table/MongoDBTableSource.java | 312 ++++++++++++++
.../mongodb/table/MongoDBTableSourceFactory.java | 295 ++++++++++++++
.../org.apache.flink.table.factories.Factory | 18 +
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 2 +-
.../mysql/source/config/MySqlSourceOptions.java | 2 +-
inlong-sort/sort-connectors/oracle-cdc/pom.xml | 6 +
.../sort/cdc/oracle}/DebeziumSourceFunction.java | 55 +--
.../inlong/sort/cdc/oracle/OracleSource.java | 183 +++++++++
.../sort/cdc/oracle/table/OracleTableSource.java | 246 ++++++++++++
.../oracle/table/OracleTableSourceFactory.java} | 101 ++---
.../org.apache.flink.table.factories.Factory | 18 +
inlong-sort/sort-connectors/postgres-cdc/pom.xml | 1 +
.../DebeziumSourceFunction.java | 3 +-
.../cdc/postgres/table/PostgreSQLTableFactory.java | 2 +-
licenses/inlong-sort-connectors/LICENSE | 22 +-
21 files changed, 1812 insertions(+), 112 deletions(-)
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
index c9c749ad6..bda4e7475 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -46,4 +46,9 @@ public final class Constants {
public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
+ /**
+ * It is used for inlong.metric
+ */
+ public static final String DELIMITER = "&";
+
}
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index c3af8f027..27c3a2194 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
/**
* A collection class for handling metrics
@@ -45,16 +46,66 @@ public class SinkMetricData {
this.metricGroup = metricGroup;
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
+ registerMetricsForNumRecordsOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
+ public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName,
+ Counter counter) {
numRecordsOut =
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
+ .counter(metricName, counter);
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
+ registerMetricsForNumBytesOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
+ public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName,
+ Counter counter) {
numBytesOut =
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
+ .counter(metricName, counter);
}
public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
@@ -73,16 +124,45 @@ public class SinkMetricData {
public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
String metricName) {
+ registerMetricsForDirtyRecords(groupId, streamId, nodeId, metricName, new SimpleCounter());
+ }
+
+ public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
+ String metricName, Counter counter) {
dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
+ .counter(metricName, counter);
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
String metricName) {
+ registerMetricsForDirtyBytes(groupId, streamId, nodeId, metricName, new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
+ public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
+ String metricName, Counter counter) {
dirtyBytes =
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
-
+ .counter(metricName, counter);
}
public Counter getNumRecordsOut() {
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 8a065836f..a962e2938 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
/**
* A collection class for handling metrics
@@ -42,16 +43,66 @@ public class SourceMetricData {
this.metricGroup = metricGroup;
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
+ registerMetricsForNumRecordsIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
+ public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName,
+ Counter counter) {
numRecordsIn =
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
+ .counter(metricName, counter);
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
+ registerMetricsForNumBytesIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ *
+ * @param groupId inlong groupId
+ * @param streamId inlong streamId
+ * @param nodeId inlong nodeId
+ * @param metricName metric name
+ */
+ public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName,
+ Counter counter) {
numBytesIn =
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
+ .counter(metricName, counter);
}
public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index 5ffda58ab..f4d3e6036 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -36,6 +36,11 @@
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -53,6 +58,7 @@
<configuration>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
similarity index 95%
copy from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
copy to inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 157cdb3e6..141cc5672 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.cdc.postgres;
+package org.apache.inlong.sort.cdc.mongodb;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.Validator;
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -78,10 +79,6 @@ import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -115,33 +112,40 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+ private static final long serialVersionUID = -5808108641062931623L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
/**
* State name of the consumer's partition offset states.
*/
public static final String OFFSETS_STATE_NAME = "offset-states";
+
/**
* State name of the consumer's history records state.
*/
public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
/**
* The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
/**
* The configuration represents the Debezium MySQL Connector uses the legacy implementation or
* not.
*/
public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
/**
* The configuration value represents legacy implementation.
*/
public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
- protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
- private static final long serialVersionUID = -5808108641062931623L;
// ---------------------------------------------------------------------------------------
// Properties
// ---------------------------------------------------------------------------------------
+
/**
* The schema to convert from Debezium's messages into Flink's objects.
*/
@@ -162,18 +166,21 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
* Data for pending but uncommitted offsets.
*/
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
- /**
- * Validator to validate the connected database satisfies the cdc connector's requirements.
- */
- private final Validator validator;
+
/**
* Flag indicating whether the Debezium Engine is started.
*/
private volatile boolean debeziumStarted = false;
+ /**
+ * Validator to validate the connected database satisfies the cdc connector's requirements.
+ */
+ private final Validator validator;
+
// ---------------------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------------------
+
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
@@ -226,7 +233,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private String inlongMetric;
- private SourceMetricData sourceMetricData;
+ private SourceMetricData metricData;
// ---------------------------------------------------------------------------------------
@@ -414,17 +421,17 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split("_");
+ String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- sourceMetricData = new SourceMetricData(metricGroup);
- sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, NUM_RECORDS_IN);
- sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, NUM_BYTES_IN);
- sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, streamId,
- nodeId, NUM_BYTES_IN_PER_SECOND);
- sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
- nodeId, NUM_RECORDS_IN_PER_SECOND);
+ metricData = new SourceMetricData(metricGroup);
+ metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
+ metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
+ metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
+ Constants.NUM_BYTES_IN_PER_SECOND);
+ metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
+ Constants.NUM_RECORDS_IN_PER_SECOND);
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -463,9 +470,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
new DebeziumDeserializationSchema<T>() {
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
- if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(1L);
- sourceMetricData.getNumBytesIn()
+ if (metricData != null) {
+ metricData.getNumRecordsIn().inc(1L);
+ metricData.getNumBytesIn()
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
}
deserializer.deserialize(record, out);
@@ -639,6 +646,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
}
public SourceMetricData getMetricData() {
- return sourceMetricData;
+ return metricData;
}
}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
new file mode 100644
index 000000000..ebe9bf4b5
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.model.changestream.FullDocument;
+import com.mongodb.kafka.connect.source.MongoSourceConfig;
+import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
+import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume change stream
+ * events.
+ */
+@PublicEvolving
+public class MongoDBSource {
+
+ public static final String MONGODB_SCHEME = "mongodb";
+
+ public static final String ERROR_TOLERANCE_NONE = ErrorTolerance.NONE.value();
+
+ public static final String ERROR_TOLERANCE_ALL = ErrorTolerance.ALL.value();
+
+ public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
+
+ public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
+
+ public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500;
+
+ public static final String HEARTBEAT_TOPIC_NAME_DEFAULT = "__mongodb_heartbeats";
+
+ public static final String OUTPUT_FORMAT_SCHEMA =
+ OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT);
+
+ // Add "source" field to adapt to debezium SourceRecord
+ public static final String OUTPUT_SCHEMA_VALUE_DEFAULT =
+ "{"
+ + " \"name\": \"ChangeStream\","
+ + " \"type\": \"record\","
+ + " \"fields\": ["
+ + " { \"name\": \"_id\", \"type\": \"string\" },"
+ + " { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"source\","
+ + " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"ts_ms\", \"type\": \"long\"},"
+ + " {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
+ + " }, \"null\" ] },"
+ + " { \"name\": \"ns\","
+ + " \"type\": [{\"name\": \"ns\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"db\", \"type\": \"string\"},"
+ + " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
+ + " }, \"null\" ] },"
+ + " { \"name\": \"to\","
+ + " \"type\": [{\"name\": \"to\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"db\", \"type\": \"string\"},"
+ + " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
+ + " }, \"null\" ] },"
+ + " { \"name\": \"documentKey\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"updateDescription\","
+ + " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
+ + " {\"name\": \"removedFields\","
+ + " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
+ + " }] }, \"null\"] },"
+ + " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
+ + " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
+ + " \"fields\": [ {\"name\": \"id\", \"type\": \"string\"},"
+ + " {\"name\": \"uid\", \"type\": \"string\"}] }, \"null\"] }"
+ + " ]"
+ + "}";
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ private static String encodeValue(String value) {
+ try {
+ return URLEncoder.encode(value, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /** Builder class of {@link MongoDBSource}. */
+ public static class Builder<T> {
+
+ private String hosts;
+ private String username;
+ private String password;
+ private List<String> databaseList;
+ private List<String> collectionList;
+ private String connectionOptions;
+ private Integer batchSize;
+ private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
+ private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
+ private Boolean copyExisting = true;
+ private Integer copyExistingMaxThreads;
+ private Integer copyExistingQueueSize;
+ private String copyExistingPipeline;
+ private Boolean errorsLogEnable;
+ private String errorsTolerance;
+ private Integer heartbeatIntervalMillis;
+ private DebeziumDeserializationSchema<T> deserializer;
+ private String inLongMetric;
+
+ /** The comma-separated list of hostname and port pairs of mongodb servers. */
+ public Builder<T> hosts(String hosts) {
+ this.hosts = hosts;
+ return this;
+ }
+
+ /**
+ * Ampersand (i.e. &) separated MongoDB connection options eg
+ * replicaSet=test&connectTimeoutMS=300000
+ * https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options
+ */
+ public Builder<T> connectionOptions(String connectionOptions) {
+ this.connectionOptions = connectionOptions;
+ return this;
+ }
+
+ /** Name of the database user to be used when connecting to MongoDB. */
+ public Builder<T> username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /** Password to be used when connecting to MongoDB. */
+ public Builder<T> password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /** Regular expressions list that match database names to be monitored. */
+ public Builder<T> databaseList(String... databaseList) {
+ this.databaseList = Arrays.asList(databaseList);
+ return this;
+ }
+
+ /**
+ * Regular expressions that match fully-qualified collection identifiers for collections to
+ * be monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}.
+ */
+ public Builder<T> collectionList(String... collectionList) {
+ this.collectionList = Arrays.asList(collectionList);
+ return this;
+ }
+
+ /**
+ * batch.size
+ *
+ * <p>The cursor batch size. Default: 0
+ */
+ public Builder<T> batchSize(int batchSize) {
+ checkArgument(batchSize >= 0);
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * poll.await.time.ms
+ *
+ * <p>The amount of time to wait before checking for new results on the change stream.
+ * Default: 3000
+ */
+ public Builder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) {
+ checkArgument(pollAwaitTimeMillis > 0);
+ this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+ return this;
+ }
+
+ /**
+ * poll.max.batch.size
+ *
+ * <p>Maximum number of change stream documents to include in a single batch when polling
+ * for new data. This setting can be used to limit the amount of data buffered internally in
+ * the connector. Default: 1000
+ */
+ public Builder<T> pollMaxBatchSize(int pollMaxBatchSize) {
+ checkArgument(pollMaxBatchSize > 0);
+ this.pollMaxBatchSize = pollMaxBatchSize;
+ return this;
+ }
+
+ /**
+ * copy.existing
+ *
+ * <p>Copy existing data from source collections and convert them to Change Stream events on
+ * their respective topics. Any changes to the data that occur during the copy process are
+ * applied once the copy is completed.
+ */
+ public Builder<T> copyExisting(boolean copyExisting) {
+ this.copyExisting = copyExisting;
+ return this;
+ }
+
+ /**
+ * copy.existing.max.threads
+ *
+ * <p>The number of threads to use when performing the data copy. Defaults to the number of
+ * processors. Default: defaults to the number of processors
+ */
+ public Builder<T> copyExistingMaxThreads(int copyExistingMaxThreads) {
+ checkArgument(copyExistingMaxThreads > 0);
+ this.copyExistingMaxThreads = copyExistingMaxThreads;
+ return this;
+ }
+
+ /**
+ * copy.existing.queue.size
+ *
+ * <p>The max size of the queue to use when copying data. Default: 16000
+ */
+ public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) {
+ checkArgument(copyExistingQueueSize > 0);
+ this.copyExistingQueueSize = copyExistingQueueSize;
+ return this;
+ }
+
+ /**
+ * copy.existing.pipeline eg. [ { "$match": { "closed": "false" } } ]
+ *
+ * <p>An array of JSON objects describing the pipeline operations to run when copying
+ * existing data. This can improve the use of indexes by the copying manager and make
+ * copying more efficient.
+ */
+ public Builder<T> copyExistingPipeline(String copyExistingPipeline) {
+ this.copyExistingPipeline = copyExistingPipeline;
+ return this;
+ }
+
+ /**
+ * errors.log.enable
+ *
+ * <p>Whether details of failed operations should be written to the log file. When set to
+ * true, both errors that are tolerated (determined by the errors.tolerance setting) and not
+ * tolerated are written. When set to false, errors that are tolerated are omitted.
+ */
+ public Builder<T> errorsLogEnable(boolean errorsLogEnable) {
+ this.errorsLogEnable = errorsLogEnable;
+ return this;
+ }
+
+ /**
+ * errors.tolerance
+ *
+ * <p>Whether to continue processing messages if an error is encountered. When set to none,
+ * the connector reports an error and blocks further processing of the rest of the records
+ * when it encounters an error. When set to all, the connector silently ignores any bad
+ * messages.
+ *
+ * <p>Default: "none" Accepted Values: "none" or "all"
+ */
+ public Builder<T> errorsTolerance(String errorsTolerance) {
+ this.errorsTolerance = errorsTolerance;
+ return this;
+ }
+
+ /**
+ * heartbeat.interval.ms
+ *
+ * <p>The length of time in milliseconds between sending heartbeat messages. Heartbeat
+ * messages contain the post batch resume token and are sent when no source records have
+ * been published in the specified interval. This improves the resumability of the connector
+ * for low volume namespaces. Use 0 to disable.
+ */
+ public Builder<T> heartbeatIntervalMillis(int heartbeatIntervalMillis) {
+ checkArgument(heartbeatIntervalMillis >= 0);
+ this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public Builder<T> inLongMetric(String inLongMetric) {
+ this.inLongMetric = inLongMetric;
+ return this;
+ }
+
+ /** Build connection uri. */
+ @VisibleForTesting
+ public ConnectionString buildConnectionUri() {
+ StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
+
+ if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+ sb.append(encodeValue(username))
+ .append(":")
+ .append(encodeValue(password))
+ .append("@");
+ }
+
+ sb.append(checkNotNull(hosts));
+
+ if (StringUtils.isNotEmpty(connectionOptions)) {
+ sb.append("/?").append(connectionOptions);
+ }
+
+ return new ConnectionString(sb.toString());
+ }
+
+ /**
+ * The properties of mongodb kafka connector.
+ * https://docs.mongodb.com/kafka-connector/current/kafka-source
+ */
+ public DebeziumSourceFunction<T> build() {
+ Properties props = new Properties();
+
+ props.setProperty(
+ "connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
+ props.setProperty("name", "mongodb_binlog_source");
+
+ ConnectionString connectionString = buildConnectionUri();
+ props.setProperty(
+ MongoSourceConfig.CONNECTION_URI_CONFIG, String.valueOf(connectionString));
+
+ if (databaseList != null) {
+ props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
+ }
+
+ if (collectionList != null) {
+ props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
+ }
+
+ props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
+ props.setProperty(
+ MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
+ String.valueOf(Boolean.FALSE));
+
+ props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_KEY_CONFIG, OUTPUT_FORMAT_SCHEMA);
+ props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, OUTPUT_FORMAT_SCHEMA);
+ props.setProperty(
+ MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
+ String.valueOf(Boolean.FALSE));
+ props.setProperty(
+ MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA_VALUE_DEFAULT);
+
+ if (batchSize != null) {
+ props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
+ }
+
+ if (pollAwaitTimeMillis != null) {
+ props.setProperty(
+ MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG,
+ String.valueOf(pollAwaitTimeMillis));
+ }
+
+ if (pollMaxBatchSize != null) {
+ props.setProperty(
+ MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG,
+ String.valueOf(pollMaxBatchSize));
+ }
+
+ if (errorsLogEnable != null) {
+ props.setProperty(
+ MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG,
+ String.valueOf(errorsLogEnable));
+ }
+
+ if (errorsTolerance != null) {
+ props.setProperty(MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, errorsTolerance);
+ }
+
+ if (copyExisting != null) {
+ props.setProperty(
+ MongoSourceConfig.COPY_EXISTING_CONFIG, String.valueOf(copyExisting));
+ }
+
+ if (copyExistingMaxThreads != null) {
+ props.setProperty(
+ MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG,
+ String.valueOf(copyExistingMaxThreads));
+ }
+
+ if (copyExistingQueueSize != null) {
+ props.setProperty(
+ MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG,
+ String.valueOf(copyExistingQueueSize));
+ }
+
+ if (copyExistingPipeline != null) {
+ props.setProperty(
+ MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG, copyExistingPipeline);
+ }
+
+ if (heartbeatIntervalMillis != null) {
+ props.setProperty(
+ MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
+ String.valueOf(heartbeatIntervalMillis));
+ }
+
+ props.setProperty(
+ MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME_DEFAULT);
+
+ // Let DebeziumChangeFetcher recognize heartbeat record
+ props.setProperty(
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
+
+ return new DebeziumSourceFunction<>(
+ deserializer, props, null, Validator.getDefaultValidator(), inLongMetric);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
new file mode 100644
index 000000000..e037ef55a
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.table;
+
+import com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema;
+import com.ververica.cdc.connectors.mongodb.table.MongoDBReadableMetadata;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.cdc.mongodb.DebeziumSourceFunction;
+import org.apache.inlong.sort.cdc.mongodb.MongoDBSource;
+
+import javax.annotation.Nullable;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.mongodb.MongoNamespace.checkCollectionNameValidity;
+import static com.mongodb.MongoNamespace.checkDatabaseNameValidity;
+import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a MongoDB change stream events source
+ * from a logical description.
+ */
+public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final String hosts;
+ private final String connectionOptions;
+ private final String username;
+ private final String password;
+ private final String database;
+ private final String collection;
+ private final Boolean errorsLogEnable;
+ private final String errorsTolerance;
+ private final Boolean copyExisting;
+ private final String copyExistingPipeline;
+ private final Integer copyExistingMaxThreads;
+ private final Integer copyExistingQueueSize;
+ private final Integer pollMaxBatchSize;
+ private final Integer pollAwaitTimeMillis;
+ private final Integer heartbeatIntervalMillis;
+ private final ZoneId localTimeZone;
+
+ private final String inLongMetric;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ public MongoDBTableSource(
+ ResolvedSchema physicalSchema,
+ String hosts,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String database,
+ @Nullable String collection,
+ @Nullable String connectionOptions,
+ @Nullable String errorsTolerance,
+ @Nullable Boolean errorsLogEnable,
+ @Nullable Boolean copyExisting,
+ @Nullable String copyExistingPipeline,
+ @Nullable Integer copyExistingMaxThreads,
+ @Nullable Integer copyExistingQueueSize,
+ @Nullable Integer pollMaxBatchSize,
+ @Nullable Integer pollAwaitTimeMillis,
+ @Nullable Integer heartbeatIntervalMillis,
+ ZoneId localTimeZone,
+ String inLongMetric) {
+ this.physicalSchema = physicalSchema;
+ this.hosts = checkNotNull(hosts);
+ this.username = username;
+ this.password = password;
+ this.database = database;
+ this.collection = collection;
+ this.connectionOptions = connectionOptions;
+ this.errorsTolerance = errorsTolerance;
+ this.errorsLogEnable = errorsLogEnable;
+ this.copyExisting = copyExisting;
+ this.copyExistingPipeline = copyExistingPipeline;
+ this.copyExistingMaxThreads = copyExistingMaxThreads;
+ this.copyExistingQueueSize = copyExistingQueueSize;
+ this.pollMaxBatchSize = pollMaxBatchSize;
+ this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+ this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+ this.localTimeZone = localTimeZone;
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.inLongMetric = inLongMetric;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema<RowData> deserializer =
+ new MongoDBConnectorDeserializationSchema(
+ physicalDataType, metadataConverters, typeInfo, localTimeZone);
+
+ MongoDBSource.Builder<RowData> builder =
+ MongoDBSource.<RowData>builder().hosts(hosts).deserializer(deserializer);
+
+ if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
+ // explicitly specified database and collection.
+ if (!containsRegexMetaCharacters(database)
+ && !containsRegexMetaCharacters(collection)) {
+ checkDatabaseNameValidity(database);
+ checkCollectionNameValidity(collection);
+ builder.databaseList(database);
+ builder.collectionList(database + "." + collection);
+ } else {
+ builder.databaseList(database);
+ builder.collectionList(collection);
+ }
+ } else if (StringUtils.isNotEmpty(database)) {
+ builder.databaseList(database);
+ } else if (StringUtils.isNotEmpty(collection)) {
+ builder.collectionList(collection);
+ } else {
+ // Watching all changes on the cluster by default, we do nothing here
+ }
+
+ Optional.ofNullable(username).ifPresent(builder::username);
+ Optional.ofNullable(password).ifPresent(builder::password);
+ Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
+ Optional.ofNullable(errorsLogEnable).ifPresent(builder::errorsLogEnable);
+ Optional.ofNullable(errorsTolerance).ifPresent(builder::errorsTolerance);
+ Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting);
+ Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline);
+ Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads);
+ Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
+ Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
+ Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
+ Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
+ Optional.ofNullable(inLongMetric).ifPresent(builder::inLongMetric);
+
+ DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+
+ protected MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key ->
+ Stream.of(MongoDBReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(MongoDBReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(MongoDBReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ MongoDBReadableMetadata::getKey,
+ MongoDBReadableMetadata::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ MongoDBTableSource source =
+ new MongoDBTableSource(
+ physicalSchema,
+ hosts,
+ username,
+ password,
+ database,
+ collection,
+ connectionOptions,
+ errorsTolerance,
+ errorsLogEnable,
+ copyExisting,
+ copyExistingPipeline,
+ copyExistingMaxThreads,
+ copyExistingQueueSize,
+ pollMaxBatchSize,
+ pollAwaitTimeMillis,
+ heartbeatIntervalMillis,
+ localTimeZone,
+ inLongMetric);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MongoDBTableSource that = (MongoDBTableSource) o;
+ return Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hosts, that.hosts)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(database, that.database)
+ && Objects.equals(collection, that.collection)
+ && Objects.equals(connectionOptions, that.connectionOptions)
+ && Objects.equals(errorsTolerance, that.errorsTolerance)
+ && Objects.equals(errorsLogEnable, that.errorsLogEnable)
+ && Objects.equals(copyExisting, that.copyExisting)
+ && Objects.equals(copyExistingPipeline, that.copyExistingPipeline)
+ && Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads)
+ && Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
+ && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
+ && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
+ && Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis)
+ && Objects.equals(localTimeZone, that.localTimeZone)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(inLongMetric, that.inLongMetric);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ hosts,
+ username,
+ password,
+ database,
+ collection,
+ connectionOptions,
+ errorsTolerance,
+ errorsLogEnable,
+ copyExisting,
+ copyExistingPipeline,
+ copyExistingMaxThreads,
+ copyExistingQueueSize,
+ pollMaxBatchSize,
+ pollAwaitTimeMillis,
+ heartbeatIntervalMillis,
+ localTimeZone,
+ producedDataType,
+ metadataKeys,
+ inLongMetric);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "MongoDB-CDC";
+ }
+}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
new file mode 100644
index 000000000..d7299cf19
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
+import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
+import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Factory for creating configured instance of {@link MongoDBTableSource}.
+ */
+public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
+
+ private static final String IDENTIFIER = "mongodb-cdc-inlong";
+
+ private static final String DOCUMENT_ID_FIELD = "_id";
+
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+ private static final ConfigOption<String> HOSTS =
+ ConfigOptions.key("hosts")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The comma-separated list of hostname and port pairs of the MongoDB servers. "
+ + "eg. localhost:27017,localhost:27018");
+
+ private static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the database user to be used when connecting to MongoDB. "
+ + "This is required only when MongoDB is configured to use authentication.");
+
+ private static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to be used when connecting to MongoDB. "
+ + "This is required only when MongoDB is configured to use authentication.");
+
+ private static final ConfigOption<String> DATABASE =
+ ConfigOptions.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the database to watch for changes."
+ + "The database also supports regular expression "
+ + "to monitor multiple databases matches the regular expression."
+ + "e.g. db[0-9] .");
+
+ private static final ConfigOption<String> COLLECTION =
+ ConfigOptions.key("collection")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the collection in the database to watch for changes."
+ + "The collection also supports regular expression "
+ + "to monitor multiple collections matches fully-qualified collection identifiers."
+ + "e.g. db0\\.coll[0-9] .");
+
+ private static final ConfigOption<String> CONNECTION_OPTIONS =
+ ConfigOptions.key("connection.options")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The ampersand-separated MongoDB connection options. "
+ + "eg. replicaSet=test&connectTimeoutMS=300000");
+
+ private static final ConfigOption<String> ERRORS_TOLERANCE =
+ ConfigOptions.key("errors.tolerance")
+ .stringType()
+ .defaultValue(ERROR_TOLERANCE_NONE)
+ .withDescription(
+ "Whether to continue processing messages if an error is encountered. "
+ + "When set to none, the connector reports an error and blocks further processing "
+ + "of the rest of the records when it encounters an error. "
+ + "When set to all, the connector silently ignores any bad messages."
+ + "Accepted Values: 'none' or 'all'. Default 'none'.");
+
+ private static final ConfigOption<Boolean> ERRORS_LOG_ENABLE =
+ ConfigOptions.key("errors.log.enable")
+ .booleanType()
+ .defaultValue(Boolean.TRUE)
+ .withDescription(
+ "Whether details of failed operations should be written to the log file. "
+ + "When set to true, both errors that are tolerated (determined by the errors"
+ + ".tolerance setting) "
+ + "and not tolerated are written. When set to false, errors that are tolerated "
+ + "are omitted.");
+
+ private static final ConfigOption<Boolean> COPY_EXISTING =
+ ConfigOptions.key("copy.existing")
+ .booleanType()
+ .defaultValue(Boolean.TRUE)
+ .withDescription(
+ "Copy existing data from source collections and convert them "
+ + "to Change Stream events on their respective topics. Any changes to the data "
+ + "that occur during the copy process are applied once the copy is completed.");
+
+ private static final ConfigOption<String> COPY_EXISTING_PIPELINE =
+ ConfigOptions.key("copy.existing.pipeline")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "An array of JSON objects describing the pipeline operations "
+ + "to run when copying existing data. "
+ + "This can improve the use of indexes by the copying manager and make copying "
+ + "more efficient.");
+
+ private static final ConfigOption<Integer> COPY_EXISTING_MAX_THREADS =
+ ConfigOptions.key("copy.existing.max.threads")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The number of threads to use when performing the data copy."
+ + " Defaults to the number of processors.");
+
+ private static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
+ ConfigOptions.key("copy.existing.queue.size")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The max size of the queue to use when copying data. Defaults to 16000.");
+
+ private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE =
+ ConfigOptions.key("poll.max.batch.size")
+ .intType()
+ .defaultValue(POLL_MAX_BATCH_SIZE_DEFAULT)
+ .withDescription(
+ "Maximum number of change stream documents "
+ + "to include in a single batch when polling for new data. "
+ + "This setting can be used to limit the amount of data buffered internally in "
+ + "the connector. "
+ + "Defaults to 1000.");
+
+ private static final ConfigOption<Integer> POLL_AWAIT_TIME_MILLIS =
+ ConfigOptions.key("poll.await.time.ms")
+ .intType()
+ .defaultValue(POLL_AWAIT_TIME_MILLIS_DEFAULT)
+ .withDescription(
+ "The amount of time to wait before checking for new results on the change stream."
+ + "Defaults: 1500.");
+
+ private static final ConfigOption<Integer> HEARTBEAT_INTERVAL_MILLIS =
+ ConfigOptions.key("heartbeat.interval.ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The length of time in milliseconds between sending heartbeat messages."
+ + "Heartbeat messages contain the post batch resume token and are sent when no "
+ + "source records "
+ + "have been published in the specified interval. This improves the resumability "
+ + "of the connector "
+ + "for low volume namespaces. Use 0 to disable. Defaults to 0.");
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ final ReadableConfig config = helper.getOptions();
+
+ String hosts = config.get(HOSTS);
+ String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
+
+ String username = config.getOptional(USERNAME).orElse(null);
+ String password = config.getOptional(PASSWORD).orElse(null);
+
+ String database = config.getOptional(DATABASE).orElse(null);
+ String collection = config.getOptional(COLLECTION).orElse(null);
+
+ String errorsTolerance = config.get(ERRORS_TOLERANCE);
+ Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
+
+ Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
+ Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
+
+ Integer heartbeatIntervalMillis =
+ config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
+
+ Boolean copyExisting = config.get(COPY_EXISTING);
+ String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
+ Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
+ Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
+
+ String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
+ ZoneId localTimeZone =
+ TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
+ ? ZoneId.systemDefault()
+ : ZoneId.of(zoneId);
+ String inLongMetric = config.get(INLONG_METRIC);
+
+ ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
+ checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
+ checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");
+
+ return new MongoDBTableSource(
+ physicalSchema,
+ hosts,
+ username,
+ password,
+ database,
+ collection,
+ connectionOptions,
+ errorsTolerance,
+ errorsLogEnable,
+ copyExisting,
+ copyExistingPipeline,
+ copyExistingMaxThreads,
+ copyExistingQueueSize,
+ pollMaxBatchSize,
+ pollAwaitTimeMillis,
+ heartbeatIntervalMillis,
+ localTimeZone,
+ inLongMetric);
+ }
+
+ private void checkPrimaryKey(UniqueConstraint pk, String message) {
+ checkArgument(
+ pk.getColumns().size() == 1 && pk.getColumns().contains(DOCUMENT_ID_FIELD),
+ message);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(CONNECTION_OPTIONS);
+ options.add(DATABASE);
+ options.add(COLLECTION);
+ options.add(ERRORS_TOLERANCE);
+ options.add(ERRORS_LOG_ENABLE);
+ options.add(COPY_EXISTING);
+ options.add(COPY_EXISTING_PIPELINE);
+ options.add(COPY_EXISTING_MAX_THREADS);
+ options.add(COPY_EXISTING_QUEUE_SIZE);
+ options.add(POLL_MAX_BATCH_SIZE);
+ options.add(POLL_AWAIT_TIME_MILLIS);
+ options.add(HEARTBEAT_INTERVAL_MILLIS);
+ options.add(INLONG_METRIC);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/mongodb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..9f9de2b35
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.cdc.mongodb.table.MongoDBTableSourceFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 3bdf92831..3724d2a8e 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -145,7 +145,7 @@ public class MySqlSource<T>
configFactory.createConfig(readerContext.getIndexOfSubtask());
String inlongMetric = sourceConfig.getInlongMetric();
if (StringUtils.isNotEmpty(inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split("_");
+ String[] inlongMetricArray = inlongMetric.split("&");
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index f936ac8d1..ca32dd4a9 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -34,7 +34,7 @@ public class MySqlSourceOptions {
ConfigOptions.key("inlong.metric")
.stringType()
.defaultValue("")
- .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+ .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
public static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
diff --git a/inlong-sort/sort-connectors/oracle-cdc/pom.xml b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
index 1db604385..d31530e64 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
@@ -39,6 +39,11 @@
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -56,6 +61,7 @@
<configuration>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
similarity index 95%
copy from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
copy to inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 157cdb3e6..166f7f1b7 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.cdc.postgres;
+package org.apache.inlong.sort.cdc.oracle;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.Validator;
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -78,10 +79,6 @@ import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -115,33 +112,40 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+ private static final long serialVersionUID = -5808108641062931623L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
/**
* State name of the consumer's partition offset states.
*/
public static final String OFFSETS_STATE_NAME = "offset-states";
+
/**
* State name of the consumer's history records state.
*/
public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
/**
* The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
/**
* The configuration represents the Debezium MySQL Connector uses the legacy implementation or
* not.
*/
public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
/**
* The configuration value represents legacy implementation.
*/
public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
- protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
- private static final long serialVersionUID = -5808108641062931623L;
// ---------------------------------------------------------------------------------------
// Properties
// ---------------------------------------------------------------------------------------
+
/**
* The schema to convert from Debezium's messages into Flink's objects.
*/
@@ -162,18 +166,21 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
* Data for pending but uncommitted offsets.
*/
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
- /**
- * Validator to validate the connected database satisfies the cdc connector's requirements.
- */
- private final Validator validator;
+
/**
* Flag indicating whether the Debezium Engine is started.
*/
private volatile boolean debeziumStarted = false;
+ /**
+ * Validator to validate the connected database satisfies the cdc connector's requirements.
+ */
+ private final Validator validator;
+
// ---------------------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------------------
+
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
@@ -226,7 +233,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private String inlongMetric;
- private SourceMetricData sourceMetricData;
+ private SourceMetricData metricData;
// ---------------------------------------------------------------------------------------
@@ -414,17 +421,17 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split("_");
+ String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- sourceMetricData = new SourceMetricData(metricGroup);
- sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, NUM_RECORDS_IN);
- sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, NUM_BYTES_IN);
- sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, streamId,
- nodeId, NUM_BYTES_IN_PER_SECOND);
- sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
- nodeId, NUM_RECORDS_IN_PER_SECOND);
+ metricData = new SourceMetricData(metricGroup);
+ metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
+ metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
+ metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
+ Constants.NUM_BYTES_IN_PER_SECOND);
+ metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
+ Constants.NUM_RECORDS_IN_PER_SECOND);
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -463,9 +470,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
new DebeziumDeserializationSchema<T>() {
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
- if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(1L);
- sourceMetricData.getNumBytesIn()
+ if (metricData != null) {
+ metricData.getNumRecordsIn().inc(1L);
+ metricData.getNumBytesIn()
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
}
deserializer.deserialize(record, out);
@@ -639,6 +646,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
}
public SourceMetricData getMetricData() {
- return sourceMetricData;
+ return metricData;
}
}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
new file mode 100644
index 000000000..f9253f295
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle;
+
+import com.ververica.cdc.connectors.oracle.OracleValidator;
+import com.ververica.cdc.connectors.oracle.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import io.debezium.connector.oracle.OracleConnector;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume log miner.
+ */
+public class OracleSource {
+
+ private static final String DATABASE_SERVER_NAME = "oracle_logminer";
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ /** Builder class of {@link OracleSource}. */
+ public static class Builder<T> {
+
+ private int port = 1521; // default 1521 port
+ private String hostname;
+ private String database;
+ private String username;
+ private String password;
+ private String[] tableList;
+ private String[] schemaList;
+ private Properties dbzProperties;
+ private StartupOptions startupOptions = StartupOptions.initial();
+ private DebeziumDeserializationSchema<T> deserializer;
+ private String inLongMetric;
+
+ public Builder<T> hostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ /** Integer port number of the Oracle database server. */
+ public Builder<T> port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match database names to be monitored; any
+ * database name not included in the whitelist will be excluded from monitoring. By default
+ * all databases will be monitored.
+ */
+ public Builder<T> database(String database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match fully-qualified table identifiers for
+ * tables to be monitored; any table not included in the list will be excluded from
+ * monitoring. Each identifier is of the form schemaName.tableName. By default the connector
+ * will monitor every non-system table in each monitored database.
+ */
+ public Builder<T> tableList(String... tableList) {
+ this.tableList = tableList;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match schema names to be monitored; any
+ * schema name not included in the whitelist will be excluded from monitoring. By default
+ * all non-system schemas will be monitored.
+ */
+ public Builder<T> schemaList(String... schemaList) {
+ this.schemaList = schemaList;
+ return this;
+ }
+
+ /** Name of the Oracle database to use when connecting to the Oracle database server. */
+ public Builder<T> username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /** Password to use when connecting to the Oracle database server. */
+ public Builder<T> password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /** The Debezium Oracle connector properties. For example, "snapshot.mode". */
+ public Builder<T> debeziumProperties(Properties properties) {
+ this.dbzProperties = properties;
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ /** Specifies the startup options. */
+ public Builder<T> startupOptions(StartupOptions startupOptions) {
+ this.startupOptions = startupOptions;
+ return this;
+ }
+
+ public Builder<T> inLongMetric(String inLongMetric) {
+ this.inLongMetric = inLongMetric;
+ return this;
+ }
+
+ public DebeziumSourceFunction<T> build() {
+ Properties props = new Properties();
+ props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
+ // Logical name that identifies and provides a namespace for the particular Oracle
+ // database server being
+ // monitored. The logical name should be unique across all other connectors, since it is
+ // used as a prefix
+ // for all Kafka topic names emanating from this connector. Only alphanumeric characters
+ // and
+ // underscores should be used.
+ props.setProperty("database.server.name", DATABASE_SERVER_NAME);
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
+ props.setProperty("database.dbname", checkNotNull(database));
+ if (schemaList != null) {
+ props.setProperty("schema.whitelist", String.join(",", schemaList));
+ }
+ if (tableList != null) {
+ props.setProperty("table.include.list", String.join(",", tableList));
+ }
+
+ DebeziumOffset specificOffset = null;
+ switch (startupOptions.startupMode) {
+ case INITIAL:
+ props.setProperty("snapshot.mode", "initial");
+ break;
+
+ case LATEST_OFFSET:
+ props.setProperty("snapshot.mode", "schema_only");
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ if (dbzProperties != null) {
+ props.putAll(dbzProperties);
+ }
+
+ return new DebeziumSourceFunction<>(
+ deserializer, props, specificOffset, new OracleValidator(props), inLongMetric);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
new file mode 100644
index 000000000..36eead3b7
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.table;
+
+import com.ververica.cdc.connectors.oracle.table.OracleDeserializationConverterFactory;
+import com.ververica.cdc.connectors.oracle.table.OracleReadableMetaData;
+import com.ververica.cdc.connectors.oracle.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.cdc.oracle.DebeziumSourceFunction;
+import org.apache.inlong.sort.cdc.oracle.OracleSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical
+ * description.
+ */
+public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final int port;
+ private final String hostname;
+ private final String database;
+ private final String username;
+ private final String password;
+ private final String tableName;
+ private final String schemaName;
+ private final Properties dbzProperties;
+ private final StartupOptions startupOptions;
+ private final String inLongMetric;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ public OracleTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String tableName,
+ String schemaName,
+ String username,
+ String password,
+ Properties dbzProperties,
+ StartupOptions startupOptions,
+ String inLongMetric) {
+ this.physicalSchema = physicalSchema;
+ this.port = port;
+ this.hostname = checkNotNull(hostname);
+ this.database = checkNotNull(database);
+ this.tableName = checkNotNull(tableName);
+ this.schemaName = checkNotNull(schemaName);
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.dbzProperties = dbzProperties;
+ this.startupOptions = startupOptions;
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.inLongMetric = inLongMetric;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema<RowData> deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(metadataConverters)
+ .setResultTypeInfo(typeInfo)
+ .setUserDefinedConverterFactory(
+ OracleDeserializationConverterFactory.instance())
+ .build();
+ OracleSource.Builder<RowData> builder =
+ OracleSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(database)
+ .tableList(schemaName + "." + tableName)
+ .schemaList(schemaName)
+ .username(username)
+ .password(password)
+ .debeziumProperties(dbzProperties)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer)
+ .inLongMetric(inLongMetric);
+ DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+
+ private MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key ->
+ Stream.of(OracleReadableMetaData.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(OracleReadableMetaData::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ OracleTableSource source =
+ new OracleTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ tableName,
+ schemaName,
+ username,
+ password,
+ dbzProperties,
+ startupOptions,
+ inLongMetric);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OracleTableSource that = (OracleTableSource) o;
+ return port == that.port
+ && Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hostname, that.hostname)
+ && Objects.equals(database, that.database)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(schemaName, that.schemaName)
+ && Objects.equals(dbzProperties, that.dbzProperties)
+ && Objects.equals(startupOptions, that.startupOptions)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(inLongMetric, that.inLongMetric);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ username,
+ password,
+ tableName,
+ schemaName,
+ dbzProperties,
+ startupOptions,
+ producedDataType,
+ metadataKeys,
+ inLongMetric);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Oracle-CDC";
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(OracleReadableMetaData.values())
+ .collect(
+ Collectors.toMap(
+ OracleReadableMetaData::getKey,
+ OracleReadableMetaData::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
similarity index 63%
copy from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
copy to inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 7826202ac..c42642a9e 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -16,11 +16,14 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.cdc.postgres.table;
+package org.apache.inlong.sort.cdc.oracle.table;
+import com.ververica.cdc.connectors.oracle.table.StartupOptions;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
@@ -29,120 +32,103 @@ import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
-import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
/**
- * Factory for creating configured instance of
- * {@link com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource}.
+ * Factory for creating configured instance of {@link OracleTableSource}.
*/
-public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+public class OracleTableSourceFactory implements DynamicTableSourceFactory {
- private static final String IDENTIFIER = "postgres-cdc-inlong";
+ private static final String IDENTIFIER = "oracle-cdc-inlong";
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric")
.stringType()
.defaultValue("")
- .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+ .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
- .withDescription("IP address or hostname of the PostgreSQL database server.");
+ .withDescription("IP address or hostname of the Oracle database server.");
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
- .defaultValue(5432)
- .withDescription("Integer port number of the PostgreSQL database server.");
+ .defaultValue(1521)
+ .withDescription("Integer port number of the Oracle database server.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
- "Name of the PostgreSQL database to use when connecting to the PostgreSQL database server"
- + ".");
+ "Name of the Oracle database to use when connecting to the Oracle database server.");
private static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
- "Password to use when connecting to the PostgreSQL database server.");
+ "Password to use when connecting to the oracle database server.");
private static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
- .withDescription("Database name of the PostgreSQL server to monitor.");
+ .withDescription("Database name of the Oracle server to monitor.");
private static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
- .withDescription("Schema name of the PostgreSQL database to monitor.");
+ .withDescription("Schema name of the Oracle database to monitor.");
private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
- .withDescription("Table name of the PostgreSQL database to monitor.");
+ .withDescription("Table name of the Oracle database to monitor.");
- private static final ConfigOption<String> DECODING_PLUGIN_NAME =
- ConfigOptions.key("decoding.plugin.name")
+ public static final ConfigOption<String> SCAN_STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
.stringType()
- .defaultValue("decoderbufs")
+ .defaultValue("initial")
.withDescription(
- "The name of the Postgres logical decoding plug-in installed on the server.\n"
- + "Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,\n"
- + "wal2json_rds_streaming and pgoutput.");
-
- private static final ConfigOption<String> SLOT_NAME =
- ConfigOptions.key("slot.name")
- .stringType()
- .defaultValue("flink")
- .withDescription(
- "The name of the PostgreSQL logical decoding slot that was created for streaming changes "
- + "from a particular plug-in for a particular database/schema. The server uses "
- + "this slot "
- + "to stream events to the connector that you are configuring. Default is "
- + "\"flink\".");
+ "Optional startup mode for Oracle CDC consumer, valid enumerations are "
+ + "\"initial\", \"latest-offset\"");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
- helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+ helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
- String schemaName = config.get(SCHEMA_NAME);
String tableName = config.get(TABLE_NAME);
+ String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
- String pluginName = config.get(DECODING_PLUGIN_NAME);
- String slotName = config.get(SLOT_NAME);
+ StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
- String inlongMetric = config.get(INLONG_METRIC);
+ String inLongMetric = config.get(INLONG_METRIC);
- return new PostgreSQLTableSource(
+ return new OracleTableSource(
physicalSchema,
port,
hostname,
databaseName,
- schemaName,
tableName,
+ schemaName,
username,
password,
- pluginName,
- slotName,
getDebeziumProperties(context.getCatalogTable().getOptions()),
- inlongMetric);
+ startupOptions,
+ inLongMetric);
}
@Override
@@ -157,8 +143,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
options.add(USERNAME);
options.add(PASSWORD);
options.add(DATABASE_NAME);
- options.add(SCHEMA_NAME);
options.add(TABLE_NAME);
+ options.add(SCHEMA_NAME);
return options;
}
@@ -166,9 +152,32 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
- options.add(DECODING_PLUGIN_NAME);
- options.add(SLOT_NAME);
+ options.add(SCAN_STARTUP_MODE);
options.add(INLONG_METRIC);
return options;
}
+
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+ private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+
+ private static StartupOptions getStartupOptions(ReadableConfig config) {
+ String modeString = config.get(SCAN_STARTUP_MODE);
+
+ switch (modeString.toLowerCase()) {
+ case SCAN_STARTUP_MODE_VALUE_INITIAL:
+ return StartupOptions.initial();
+
+ case SCAN_STARTUP_MODE_VALUE_LATEST:
+ return StartupOptions.latest();
+
+ default:
+ throw new ValidationException(
+ String.format(
+ "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
+ SCAN_STARTUP_MODE.key(),
+ SCAN_STARTUP_MODE_VALUE_INITIAL,
+ SCAN_STARTUP_MODE_VALUE_LATEST,
+ modeString));
+ }
+ }
}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..247510821
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.cdc.oracle.table.OracleTableSourceFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/postgres-cdc/pom.xml b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
index 12b2ceae8..c7c3faae9 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
@@ -63,6 +63,7 @@
<configuration>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index 157cdb3e6..cc02fd1e2 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -414,7 +415,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split("_");
+ String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index 7826202ac..985de3208 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -44,7 +44,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
ConfigOptions.key("inlong.metric")
.stringType()
.defaultValue("")
- .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+ .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 9a86ef2ca..73bf1dcdd 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -436,6 +436,14 @@
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+ inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+ inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+ inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+ inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+ inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
+ inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+ inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
+ inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
Source : flink-cdc-connectors 2.2.1 (Please note that the software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
@@ -486,13 +494,13 @@
Source : flink-connector-hbase-2.2 1.13.5 (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
- 1.3.6 inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
- inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
- inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
- inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
- inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
- Source : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
- License : https://github.com/apache/iceberg/LICENSE
+ 1.3.6 inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+ Source : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
+ License : https://github.com/apache/iceberg/LICENSE
=======================================================================
Apache InLong Subcomponents: