You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/09 10:35:22 UTC
[inlong] branch master updated: [INLONG-6176][Agent] Support collect data from Oracle (#6203)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cc3da3797 [INLONG-6176][Agent] Support collect data from Oracle (#6203)
cc3da3797 is described below
commit cc3da379709a767e277a756500371aa2333946bd
Author: haibo.duan <dh...@live.cn>
AuthorDate: Wed Nov 9 18:35:16 2022 +0800
[INLONG-6176][Agent] Support collect data from Oracle (#6203)
---
.../inlong/agent/constant/OracleConstants.java | 50 ++
.../apache/inlong/agent/pojo/JobProfileDto.java | 53 +-
.../org/apache/inlong/agent/pojo/OracleJob.java | 75 +++
inlong-agent/agent-plugins/pom.xml | 5 +
.../inlong/agent/plugin/sources/OracleSource.java | 48 ++
.../plugin/sources/reader/AbstractReader.java | 35 ++
.../agent/plugin/sources/reader/BinlogReader.java | 32 +-
.../agent/plugin/sources/reader/MongoDBReader.java | 32 +-
.../{SQLServerReader.java => OracleReader.java} | 634 ++++++++++-----------
.../plugin/sources/reader/PostgreSQLReader.java | 32 +-
.../plugin/sources/reader/SQLServerReader.java | 34 +-
.../sources/snapshot/OracleSnapshotBase.java | 52 ++
.../agent/plugin/sources/TestOracleConnect.java | 62 ++
.../agent/plugin/sources/TestOracleReader.java | 234 ++++++++
.../agent/plugin/sources/TestOracleSource.java | 90 +++
pom.xml | 7 +
16 files changed, 1037 insertions(+), 438 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
new file mode 100644
index 000000000..6ba84a7c3
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class OracleConstants {
+
+ /**
+ * The snapshot includes the structure and data of the captured tables.
+ * Specify this value to populate topics with a complete representation of the data from the captured tables.
+ */
+ public static final String INITIAL = "initial";
+
+ /**
+ * The snapshot includes the structure and data of the captured tables.
+ * The connector performs an initial snapshot and then stops, without processing any subsequent changes.
+ */
+ public static final String INITIAL_ONLY = "initial_only";
+
+ /**
+ * The snapshot includes only the structure of captured tables.
+ * Specify this value if you want the connector to capture data only for changes that occur after the snapshot.
+ */
+ public static final String SCHEMA_ONLY = "schema_only";
+
+ /**
+ * This is a recovery setting for a connector that has already been capturing changes.
+ * When you restart the connector, this setting enables recovery of a corrupted or lost database history topic.
+ * You might set it periodically to "clean up" a database history topic that has been growing unexpectedly.
+ * Database history topics require infinite retention. Note this mode is only safe to be used when it is guaranteed
+ * that no schema changes happened since the point in time the connector was shut down before and the point in time
+ * the snapshot is taken.
+ */
+ public static final String SCHEMA_ONLY_RECOVERY = "schema_only_recovery";
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 810a42d2b..f967dd1a8 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -54,6 +54,10 @@ public class JobProfileDto {
* mongo source
*/
public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource";
+ /**
+ * oracle source
+ */
+ public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource";
/**
* mqtt source
*/
@@ -230,10 +234,10 @@ public class JobProfileDto {
return mongoJob;
}
- private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
- SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
- SqlServerJob.SqlserverJobConfig.class);
- SqlServerJob oracleJob = new SqlServerJob();
+ private static OracleJob getOracleJob(DataConfig dataConfigs) {
+ OracleJob.OracleJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+ OracleJob.OracleJobConfig.class);
+ OracleJob oracleJob = new OracleJob();
oracleJob.setUser(config.getUser());
oracleJob.setHostname(config.getHostname());
oracleJob.setPassword(config.getPassword());
@@ -241,23 +245,51 @@ public class JobProfileDto {
oracleJob.setServerName(config.getServerName());
oracleJob.setDbname(config.getDbname());
- SqlServerJob.Offset offset = new SqlServerJob.Offset();
+ OracleJob.Offset offset = new OracleJob.Offset();
offset.setFilename(config.getOffsetFilename());
offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
oracleJob.setOffset(offset);
- SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+ OracleJob.Snapshot snapshot = new OracleJob.Snapshot();
snapshot.setMode(config.getSnapshotMode());
oracleJob.setSnapshot(snapshot);
- SqlServerJob.History history = new SqlServerJob.History();
+ OracleJob.History history = new OracleJob.History();
history.setFilename(config.getHistoryFilename());
oracleJob.setHistory(history);
return oracleJob;
}
+ private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+ SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+ SqlServerJob.SqlserverJobConfig.class);
+ SqlServerJob sqlServerJob = new SqlServerJob();
+ sqlServerJob.setUser(config.getUser());
+ sqlServerJob.setHostname(config.getHostname());
+ sqlServerJob.setPassword(config.getPassword());
+ sqlServerJob.setPort(config.getPort());
+ sqlServerJob.setServerName(config.getServerName());
+ sqlServerJob.setDbname(config.getDbname());
+
+ SqlServerJob.Offset offset = new SqlServerJob.Offset();
+ offset.setFilename(config.getOffsetFilename());
+ offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+ offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+ sqlServerJob.setOffset(offset);
+
+ SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+ snapshot.setMode(config.getSnapshotMode());
+ sqlServerJob.setSnapshot(snapshot);
+
+ SqlServerJob.History history = new SqlServerJob.History();
+ history.setFilename(config.getHistoryFilename());
+ sqlServerJob.setHistory(history);
+
+ return sqlServerJob;
+ }
+
public static MqttJob getMqttJob(DataConfig dataConfigs) {
MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
MqttJob.MqttJobConfig.class);
@@ -341,6 +373,12 @@ public class JobProfileDto {
job.setSource(KAFKA_SOURCE);
profileDto.setJob(job);
break;
+ case ORACLE:
+ OracleJob oracleJob = getOracleJob(dataConfig);
+ job.setOracleJob(oracleJob);
+ job.setSource(ORACLE_SOURCE);
+ profileDto.setJob(job);
+ break;
case SQLSERVER:
SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
job.setSqlserverJob(sqlserverJob);
@@ -385,6 +423,7 @@ public class JobProfileDto {
private FileJob fileJob;
private BinlogJob binlogJob;
private KafkaJob kafkaJob;
+ private OracleJob oracleJob;
private MongoJob mongoJob;
private MqttJob mqttJob;
private SqlServerJob sqlserverJob;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java
new file mode 100644
index 000000000..ef2420b3a
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class OracleJob {
+
+ private String hostname;
+ private String user;
+ private String password;
+ private String port;
+ private String serverName;
+ private String dbname;
+
+ private OracleJob.Snapshot snapshot;
+ private OracleJob.Offset offset;
+ private OracleJob.History history;
+
+ @Data
+ public static class Offset {
+
+ private String intervalMs;
+ private String filename;
+ private String specificOffsetFile;
+ private String specificOffsetPos;
+ }
+
+ @Data
+ public static class Snapshot {
+
+ private String mode;
+ }
+
+ @Data
+ public static class History {
+
+ private String filename;
+ }
+
+ @Data
+ public static class OracleJobConfig {
+
+ private String hostname;
+ private String user;
+ private String password;
+ private String port;
+ private String dbname;
+ private String serverName;
+
+ private String snapshotMode;
+ private String intervalMs;
+ private String offsetFilename;
+ private String historyFilename;
+
+ private String specificOffsetFile;
+ private String specificOffsetPos;
+ }
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index fa8e03a41..85520f508 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -85,6 +85,11 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-oracle</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
new file mode 100644
index 000000000..b78f18dac
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Oracle SQL source
+ */
+public class OracleSource extends AbstractSource {
+
+ private static final Logger logger = LoggerFactory.getLogger(OracleSource.class);
+
+ public OracleSource() {
+ }
+
+ @Override
+ public List<Reader> split(JobProfile conf) {
+ super.init(conf);
+ Reader oracleReader = new OracleReader();
+ List<Reader> readerList = new ArrayList<>();
+ readerList.add(oracleReader);
+ sourceMetric.sourceSuccessCount.incrementAndGet();
+ return readerList;
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 5736f21b9..115070dc4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -21,8 +21,14 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.inlong.common.metric.MetricRegister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -49,6 +55,8 @@ public abstract class AbstractReader implements Reader {
protected String metricName;
protected Map<String, String> dimensions;
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractReader.class);
+
@Override
public void init(JobProfile jobConf) {
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
@@ -68,4 +76,31 @@ public abstract class AbstractReader implements Reader {
public String getInlongGroupId() {
return inlongGroupId;
}
+
+ /**
+ * specific offsets
+ *
+ * @param server specific server
+ * @param file specific offset file
+ * @param pos specific offset pos
+ * @return
+ */
+ public String serializeOffset(final String server, final String file,
+ final String pos) {
+ Map<String, Object> sourceOffset = new HashMap<>();
+ sourceOffset.put("file", file);
+ sourceOffset.put("pos", pos);
+ DebeziumOffset specificOffset = new DebeziumOffset();
+ specificOffset.setSourceOffset(sourceOffset);
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("server", server);
+ specificOffset.setSourcePartition(sourcePartition);
+ byte[] serializedOffset = new byte[0];
+ try {
+ serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+ } catch (IOException e) {
+ LOGGER.error("serialize offset message error", e);
+ }
+ return new String(serializedOffset, StandardCharsets.UTF_8);
+ }
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 38877951b..fa61c0a10 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -35,14 +35,11 @@ import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -215,8 +212,13 @@ public class BinlogReader extends AbstractReader {
props.setProperty("offset.storage.file.filename", offsetStoreFileName);
props.setProperty("database.history.file.filename", databaseStoreHistoryName);
if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
- props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+ serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
} else {
props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
@@ -235,28 +237,6 @@ public class BinlogReader extends AbstractReader {
return props;
}
- private String serializeOffset() {
- Map<String, Object> sourceOffset = new HashMap<>();
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
- sourceOffset.put("file", specificOffsetFile);
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
- sourceOffset.put("pos", specificOffsetPos);
- DebeziumOffset specificOffset = new DebeziumOffset();
- specificOffset.setSourceOffset(sourceOffset);
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("server", instanceId);
- specificOffset.setSourcePartition(sourcePartition);
- byte[] serializedOffset = new byte[0];
- try {
- serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
- } catch (IOException e) {
- LOGGER.error("serialize offset message error", e);
- }
- return new String(serializedOffset, StandardCharsets.UTF_8);
- }
-
@Override
public void destroy() {
synchronized (this) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index 3abe037bb..eee9e4bea 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -35,15 +35,12 @@ import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.inlong.agent.utils.GsonUtil;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
@@ -305,8 +302,13 @@ public class MongoDBReader extends AbstractReader {
String snapshotMode = props.getOrDefault(JOB_MONGO_SNAPSHOT_MODE, "").toString();
if (Objects.equals(SnapshotModeConstants.INITIAL, snapshotMode)) {
+ Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+ Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
- props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+ serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
} else {
props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
}
@@ -325,28 +327,6 @@ public class MongoDBReader extends AbstractReader {
builder.with(field, value);
}
- private String serializeOffset() {
- Map<String, Object> sourceOffset = new HashMap<>();
- Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
- JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
- sourceOffset.put("file", specificOffsetFile);
- Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
- JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
- sourceOffset.put("pos", specificOffsetPos);
- DebeziumOffset specificOffset = new DebeziumOffset();
- specificOffset.setSourceOffset(sourceOffset);
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("server", instanceId);
- specificOffset.setSourcePartition(sourcePartition);
- byte[] serializedOffset = new byte[0];
- try {
- serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
- } catch (IOException e) {
- LOGGER.error("serialize offset message error", e);
- }
- return new String(serializedOffset, StandardCharsets.UTF_8);
- }
-
/**
* Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)}
* for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished.
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
similarity index 72%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
index 079d9af07..bfc80542a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
@@ -1,326 +1,308 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
-import io.debezium.connector.sqlserver.SqlServerConnector;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import io.debezium.relational.history.FileDatabaseHistory;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.SnapshotModeConstants;
-import org.apache.inlong.agent.constant.SqlServerConstants;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
-import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
-import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
-import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
-import org.apache.inlong.agent.utils.GsonUtil;
-import org.apache.kafka.connect.storage.FileOffsetBackingStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-
-/**
- * Read data from SQLServer database by Debezium
- */
-public class SQLServerReader extends AbstractReader {
-
- public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
- public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
- public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
- public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
- public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
- public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
- public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode";
- public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize";
- public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets";
- public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile";
- public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos";
-
- public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName";
-
- public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
- public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
- private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
-
- private static final Gson GSON = new Gson();
-
- private String databaseStoreHistoryName;
- private String instanceId;
- private String dbName;
- private String serverName;
- private String userName;
- private String password;
- private String hostName;
- private String port;
- private String offsetFlushIntervalMs;
- private String offsetStoreFileName;
- private String snapshotMode;
- private String offset;
- private String specificOffsetFile;
- private String specificOffsetPos;
-
- private ExecutorService executor;
- private SqlServerSnapshotBase sqlServerSnapshot;
- private boolean finished = false;
-
- private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
- private JobProfile jobProfile;
- private boolean destroyed = false;
-
- public SQLServerReader() {
-
- }
-
- @Override
- public Message read() {
- if (!sqlServerMessageQueue.isEmpty()) {
- return getSqlServerMessage();
- } else {
- return null;
- }
- }
-
- /**
- * poll message from buffer pool
- *
- * @return org.apache.inlong.agent.plugin.Message
- */
- private DefaultMessage getSqlServerMessage() {
- // Retrieves and removes the head of this queue,
- // or returns null if this queue is empty.
- Pair<String, String> message = sqlServerMessageQueue.poll();
- if (Objects.isNull(message)) {
- return null;
- }
- Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
- header.put(PROXY_KEY_DATA, message.getKey());
- return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
- }
-
- public boolean isDestroyed() {
- return destroyed;
- }
-
- @Override
- public boolean isFinished() {
- return finished;
- }
-
- @Override
- public String getReadSource() {
- return instanceId;
- }
-
- @Override
- public void setReadTimeout(long mill) {
-
- }
-
- @Override
- public void setWaitMillisecond(long millis) {
-
- }
-
- @Override
- public String getSnapshot() {
- if (sqlServerSnapshot != null) {
- return sqlServerSnapshot.getSnapshot();
- } else {
- return StringUtils.EMPTY;
- }
- }
-
- @Override
- public void finishRead() {
- this.finished = true;
- }
-
- @Override
- public boolean isSourceExist() {
- return true;
- }
-
- private String tryToInitAndGetHistoryPath() {
- String historyPath = agentConf.get(
- AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
- String parentPath = agentConf.get(
- AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
- return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
- }
-
- @Override
- public void init(JobProfile jobConf) {
- super.init(jobConf);
- jobProfile = jobConf;
- LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr());
- userName = jobConf.get(JOB_DATABASE_USER);
- password = jobConf.get(JOB_DATABASE_PASSWORD);
- hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
- port = jobConf.get(JOB_DATABASE_PORT);
- dbName = jobConf.get(JOB_DATABASE_DBNAME);
- serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
- instanceId = jobConf.getInstanceId();
- offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
- offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
- tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
- snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL);
- sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
- finished = false;
-
- databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
- tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
- offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
- specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
- specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
-
- sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName);
- sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile());
-
- Properties props = getEngineProps();
-
- DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
- io.debezium.engine.format.Json.class)
- .using(props)
- .notifying((records, committer) -> {
- try {
- for (ChangeEvent<String, String> record : records) {
- DebeziumFormat debeziumFormat = GSON
- .fromJson(record.value(), DebeziumFormat.class);
- sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
- committer.markProcessed(record);
- }
- committer.markBatchFinished();
- long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
- System.currentTimeMillis(), records.size(), dataSize);
- readerMetric.pluginReadSuccessCount.addAndGet(records.size());
- readerMetric.pluginReadCount.addAndGet(records.size());
- } catch (Exception e) {
- readerMetric.pluginReadFailCount.addAndGet(records.size());
- readerMetric.pluginReadCount.addAndGet(records.size());
- LOGGER.error("parse SqlServer message error", e);
- }
- })
- .using((success, message, error) -> {
- if (!success) {
- LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error);
- }
- }).build();
-
- executor = Executors.newSingleThreadExecutor();
- executor.execute(engine);
-
- LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
- }
-
- private String serializeOffset() {
- Map<String, Object> sourceOffset = new HashMap<>();
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
- sourceOffset.put("file", specificOffsetFile);
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
- sourceOffset.put("pos", specificOffsetPos);
- DebeziumOffset specificOffset = new DebeziumOffset();
- specificOffset.setSourceOffset(sourceOffset);
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("server", instanceId);
- specificOffset.setSourcePartition(sourcePartition);
- byte[] serializedOffset = new byte[0];
- try {
- serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
- } catch (IOException e) {
- LOGGER.error("serialize offset message error", e);
- }
- return new String(serializedOffset, StandardCharsets.UTF_8);
- }
-
- private Properties getEngineProps() {
- Properties props = new Properties();
- props.setProperty("name", "engine" + instanceId);
- props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
- props.setProperty("database.hostname", hostName);
- props.setProperty("database.port", port);
- props.setProperty("database.user", userName);
- props.setProperty("database.password", password);
- props.setProperty("database.dbname", dbName);
- props.setProperty("database.server.name", serverName);
- props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
- props.setProperty("database.snapshot.mode", snapshotMode);
- props.setProperty("key.converter.schemas.enable", "false");
- props.setProperty("value.converter.schemas.enable", "false");
- props.setProperty("snapshot.mode", snapshotMode);
- props.setProperty("offset.storage.file.filename", offsetStoreFileName);
- props.setProperty("database.history.file.filename", databaseStoreHistoryName);
- if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
- props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
- props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
- props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
- } else {
- props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
- props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
- }
- props.setProperty("tombstones.on.delete", "false");
- props.setProperty("converters", "datetime");
- props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
- props.setProperty("datetime.format.date", "yyyy-MM-dd");
- props.setProperty("datetime.format.time", "HH:mm:ss");
- props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
- props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
-
- LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props);
- return props;
- }
-
- @Override
- public void destroy() {
- synchronized (this) {
- if (!destroyed) {
- this.executor.shutdownNow();
- this.sqlServerSnapshot.close();
- this.destroyed = true;
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.oracle.OracleConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.OracleConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+
+/**
+ * Read data from Oracle database by Debezium
+ */
+public class OracleReader extends AbstractReader {
+
+ public static final String ORACLE_READER_TAG_NAME = "AgentOracleMetric";
+ public static final String JOB_DATABASE_USER = "job.oracleJob.user";
+ public static final String JOB_DATABASE_PASSWORD = "job.oracleJob.password";
+ public static final String JOB_DATABASE_HOSTNAME = "job.oracleJob.hostname";
+ public static final String JOB_DATABASE_PORT = "job.oracleJob.port";
+ public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.oracleJob.snapshot.mode";
+ public static final String JOB_DATABASE_SERVER_NAME = "job.oracleJob.serverName";
+ public static final String JOB_DATABASE_QUEUE_SIZE = "job.oracleJob.queueSize";
+ public static final String JOB_DATABASE_OFFSETS = "job.oracleJob.offsets";
+ public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.oracleJob.offset.specificOffsetFile";
+ public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.oracleJob.offset.specificOffsetPos";
+ public static final String JOB_DATABASE_DBNAME = "job.oracleJob.dbname";
+ public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.oracleJob.offset.intervalMs";
+ public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.oracleJob.history.filename";
+
+ private static final Gson GSON = new Gson();
+ public static final String ORACLE = "oracle";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+
+ private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+
+ private String databaseStoreHistoryName;
+ private String instanceId;
+ private String dbName;
+ private String serverName;
+ private String userName;
+ private String password;
+ private String hostName;
+ private String port;
+ private String offsetFlushIntervalMs;
+ private String offsetStoreFileName;
+ private String snapshotMode;
+ private String offset;
+ private String specificOffsetFile;
+ private String specificOffsetPos;
+ private OracleSnapshotBase oracleSnapshot;
+ private boolean finished = false;
+ private ExecutorService executor;
+
+ /**
+ * pair.left : table name
+ * pair.right : actual data
+ */
+ private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue;
+ private JobProfile jobProfile;
+ private boolean destroyed = false;
+
+ public OracleReader() {
+ }
+
+ @Override
+ public Message read() {
+ if (!oracleMessageQueue.isEmpty()) {
+ return getOracleMessage();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * poll message from buffer pool
+ *
+ * @return org.apache.inlong.agent.plugin.Message
+ */
+ private DefaultMessage getOracleMessage() {
+ // Retrieves and removes the head of this queue,
+ // or returns null if this queue is empty.
+ Pair<String, String> message = oracleMessageQueue.poll();
+ if (Objects.isNull(message)) {
+ return null;
+ }
+ Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+ header.put(PROXY_KEY_DATA, message.getKey());
+ return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
+ }
+
+ public boolean isDestroyed() {
+ return destroyed;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public String getReadSource() {
+ return instanceId;
+ }
+
+ @Override
+ public void setReadTimeout(long mill) {
+
+ }
+
+ @Override
+ public void setWaitMillisecond(long millis) {
+
+ }
+
+ @Override
+ public String getSnapshot() {
+ if (oracleSnapshot != null) {
+ return oracleSnapshot.getSnapshot();
+ } else {
+ return StringUtils.EMPTY;
+ }
+ }
+
+ @Override
+ public void finishRead() {
+ this.finished = true;
+ }
+
+ @Override
+ public boolean isSourceExist() {
+ return true;
+ }
+
+ private String tryToInitAndGetHistoryPath() {
+ String historyPath = agentConf.get(
+ AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+ String parentPath = agentConf.get(
+ AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+ return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+ }
+
+ @Override
+ public void init(JobProfile jobConf) {
+ super.init(jobConf);
+ jobProfile = jobConf;
+ LOGGER.info("init oracle reader with jobConf {}", jobConf.toJsonStr());
+ userName = jobConf.get(JOB_DATABASE_USER);
+ password = jobConf.get(JOB_DATABASE_PASSWORD);
+ hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+ port = jobConf.get(JOB_DATABASE_PORT);
+ dbName = jobConf.get(JOB_DATABASE_DBNAME);
+ serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+ instanceId = jobConf.getInstanceId();
+ offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+ offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+ tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+ snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, OracleConstants.INITIAL);
+ oracleMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
+ finished = false;
+
+ databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+ tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
+ offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+ specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+ specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+ oracleSnapshot = new OracleSnapshotBase(offsetStoreFileName);
+ oracleSnapshot.save(offset, oracleSnapshot.getFile());
+
+ Properties props = getEngineProps();
+
+ DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+ io.debezium.engine.format.Json.class)
+ .using(props)
+ .notifying((records, committer) -> {
+ try {
+ for (ChangeEvent<String, String> record : records) {
+ DebeziumFormat debeziumFormat = GSON
+ .fromJson(record.value(), DebeziumFormat.class);
+ oracleMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+ committer.markProcessed(record);
+ }
+ committer.markBatchFinished();
+ long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+ System.currentTimeMillis(), records.size(), dataSize);
+ readerMetric.pluginReadSuccessCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
+ } catch (Exception e) {
+ readerMetric.pluginReadFailCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
+ LOGGER.error("parse binlog message error", e);
+ }
+ })
+ .using((success, message, error) -> {
+ if (!success) {
+ LOGGER.error("oracle job with jobConf {} has error {}", instanceId, message, error);
+ }
+ }).build();
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(engine);
+
+ LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+ }
+
+ private Properties getEngineProps() {
+ Properties props = new Properties();
+ props.setProperty("name", "engine" + instanceId);
+ props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
+ props.setProperty("database.hostname", hostName);
+ props.setProperty("database.port", port);
+ props.setProperty("database.user", userName);
+ props.setProperty("database.password", password);
+ props.setProperty("database.dbname", dbName);
+ props.setProperty("database.server.name", serverName);
+ props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+ props.setProperty("database.snapshot.mode", snapshotMode);
+ props.setProperty("key.converter.schemas.enable", "false");
+ props.setProperty("value.converter.schemas.enable", "false");
+ props.setProperty("snapshot.mode", snapshotMode);
+ props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+ props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+ if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
+ props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+ serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
+ props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
+ } else {
+ props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+ props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
+ }
+ props.setProperty("tombstones.on.delete", "false");
+ props.setProperty("converters", "datetime");
+ props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+ props.setProperty("datetime.format.date", "yyyy-MM-dd");
+ props.setProperty("datetime.format.time", "HH:mm:ss");
+ props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+ props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+ LOGGER.info("oracle job {} start with props {}", jobProfile.getInstanceId(), props);
+ return props;
+ }
+
+ @Override
+ public void destroy() {
+ synchronized (this) {
+ if (!destroyed) {
+ this.executor.shutdownNow();
+ this.oracleSnapshot.close();
+ this.destroyed = true;
+ }
+ }
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index 23bfa1fbe..38f02e277 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -34,14 +34,11 @@ import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -212,8 +209,13 @@ public class PostgreSQLReader extends AbstractReader {
props.setProperty("snapshot.mode", snapshotMode);
props.setProperty("offset.storage.file.filename", offsetStoreFileName);
if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) {
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
- props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+ serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
} else {
props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
}
@@ -229,28 +231,6 @@ public class PostgreSQLReader extends AbstractReader {
return props;
}
- private String serializeOffset() {
- Map<String, Object> sourceOffset = new HashMap<>();
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
- sourceOffset.put("file", specificOffsetFile);
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
- sourceOffset.put("pos", specificOffsetPos);
- DebeziumOffset specificOffset = new DebeziumOffset();
- specificOffset.setSourceOffset(sourceOffset);
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("server", instanceId);
- specificOffset.setSourcePartition(sourcePartition);
- byte[] serializedOffset = new byte[0];
- try {
- serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
- } catch (IOException e) {
- LOGGER.error("serialize offset message error", e);
- }
- return new String(serializedOffset, StandardCharsets.UTF_8);
- }
-
@Override
public void destroy() {
synchronized (this) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 079d9af07..3a6eeafcc 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -37,15 +37,12 @@ import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.inlong.agent.utils.GsonUtil;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -80,7 +77,7 @@ public class SQLServerReader extends AbstractReader {
public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
- private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerReader.class);
private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
private static final Gson GSON = new Gson();
@@ -254,28 +251,6 @@ public class SQLServerReader extends AbstractReader {
LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
}
- private String serializeOffset() {
- Map<String, Object> sourceOffset = new HashMap<>();
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
- sourceOffset.put("file", specificOffsetFile);
- Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
- JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
- sourceOffset.put("pos", specificOffsetPos);
- DebeziumOffset specificOffset = new DebeziumOffset();
- specificOffset.setSourceOffset(sourceOffset);
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("server", instanceId);
- specificOffset.setSourcePartition(sourcePartition);
- byte[] serializedOffset = new byte[0];
- try {
- serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
- } catch (IOException e) {
- LOGGER.error("serialize offset message error", e);
- }
- return new String(serializedOffset, StandardCharsets.UTF_8);
- }
-
private Properties getEngineProps() {
Properties props = new Properties();
props.setProperty("name", "engine" + instanceId);
@@ -294,8 +269,13 @@ public class SQLServerReader extends AbstractReader {
props.setProperty("offset.storage.file.filename", offsetStoreFileName);
props.setProperty("database.history.file.filename", databaseStoreHistoryName);
if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
- props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+ serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
} else {
props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java
new file mode 100644
index 000000000..4c19c3eea
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * Oracle Snapshot
+ */
+public class OracleSnapshotBase extends AbstractSnapshot {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotBase.class);
+ private final File file;
+
+ public OracleSnapshotBase(String filePath) {
+ file = new File(filePath);
+ }
+
+ @Override
+ public String getSnapshot() {
+ byte[] offset = this.load(this.file);
+ return ENCODER.encodeToString(offset);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
new file mode 100644
index 000000000..f577ed5db
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
+/**
+ * Test cases for {@link OracleReader}.
+ */
+public class TestOracleConnect {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestOracleConnect.class);
+
+ @Ignore
+ public void testOracle() {
+ JobProfile jobProfile = new JobProfile();
+ jobProfile.set("job.oracleJob.hostname", "localhost");
+ jobProfile.set("job.oracleJob.port", "1521");
+ jobProfile.set("job.oracleJob.user", "c##dbzuser");
+ jobProfile.set("job.oracleJob.password", "dbz");
+ jobProfile.set("job.oracleJob.sid", "ORCLCDB");
+ jobProfile.set("job.oracleJob.dbname", "ORCLCDB");
+ jobProfile.set("job.oracleJob.serverName", "server1");
+ jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+ jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+ jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+ OracleReader oracleReader = new OracleReader();
+ oracleReader.init(jobProfile);
+ while (true) {
+ Message message = oracleReader.read();
+ if (message != null) {
+ LOGGER.info("event content: {}", message);
+ }
+ }
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java
new file mode 100644
index 000000000..db24e3acc
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.OracleConstants;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.field;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Test cases for {@link OracleReader}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, OracleReader.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestOracleReader {
+
+ private OracleReader reader;
+
+ @Mock
+ private JobProfile jobProfile;
+
+ @Mock
+ private AgentMetricItemSet agentMetricItemSet;
+
+ @Mock
+ private AgentMetricItem agentMetricItem;
+
+ @Mock
+ private OracleSnapshotBase oracleSnapshot;
+
+ @Mock
+ private DebeziumEngine.Builder builder;
+
+ @Mock
+ private ExecutorService executorService;
+
+ @Mock
+ private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue;
+
+ @Mock
+ private DebeziumEngine<ChangeEvent<String, String>> engine;
+
+ private AtomicLong atomicLong;
+
+ private AtomicLong atomicCountLong;
+
+ private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
+
+ @Before
+ public void setUp() throws Exception {
+ final String username = "sa";
+ final String password = "123456";
+ final String hostname = "127.0.0.1";
+ final String port = "1434";
+ final String groupId = "group01";
+ final String streamId = "stream01";
+ final String dbName = "testdb";
+ final String serverName = "serverName";
+ final String offsetFlushIntervalMs = "1000";
+ final String offsetStoreFileName = "/opt/offset.dat";
+ final String snapshotMode = OracleConstants.INITIAL;
+ final int queueSize = 1000;
+ final String databaseStoreHistoryName = "/opt/history.dat";
+ final String offset = "111";
+ final String specificOffsetFile = "";
+ final String specificOffsetPos = "-1";
+
+ atomicLong = new AtomicLong(0L);
+ atomicCountLong = new AtomicLong(0L);
+
+ when(jobProfile.getInstanceId()).thenReturn(instanceId);
+ when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId);
+ when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_USER))).thenReturn(username);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PASSWORD))).thenReturn(password);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PORT))).thenReturn(port);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_DBNAME))).thenReturn(dbName);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn(
+ offsetFlushIntervalMs);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn(
+ offsetStoreFileName);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode);
+ when(jobProfile.getInt(eq(OracleReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(databaseStoreHistoryName);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn(
+ specificOffsetFile);
+ when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn(
+ specificOffsetPos);
+ whenNew(OracleSnapshotBase.class).withAnyArguments().thenReturn(oracleSnapshot);
+
+ //mock oracleMessageQueue
+ whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(oracleMessageQueue);
+
+ //mock DebeziumEngine
+ mockStatic(DebeziumEngine.class);
+ when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+ when(builder.using(any(Properties.class))).thenReturn(builder);
+ when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+ when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+ when(builder.build()).thenReturn(engine);
+
+ //mock executorService
+ mockStatic(Executors.class);
+ when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
+
+ //mock metrics
+ whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
+ when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
+ field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem, atomicLong);
+ field(AgentMetricItem.class, "pluginReadSuccessCount").set(agentMetricItem, atomicCountLong);
+
+ //init method
+ mockStatic(MetricRegister.class);
+ (reader = new OracleReader()).init(jobProfile);
+ }
+
+ /**
+ * Test cases for {@link OracleReader#read()}.
+ */
+ @Test
+ public void testRead() throws Exception {
+ final String right = "value";
+ final String left = "key";
+ final String dataKey = "dataKey";
+ when(oracleMessageQueue.isEmpty()).thenReturn(true);
+ assertEquals(null, reader.read());
+ when(oracleMessageQueue.isEmpty()).thenReturn(false);
+ when(oracleMessageQueue.poll()).thenReturn(Pair.of(left, right));
+ Message result = reader.read();
+ assertEquals(String.join(right, "\"", "\""), result.toString());
+ assertEquals(left, result.getHeader().get(dataKey));
+ }
+
+ /**
+ * Test cases for {@link OracleReader#destroy()}.
+ */
+ @Test
+ public void testDestroy() throws Exception {
+ assertFalse(reader.isDestroyed());
+ reader.destroy();
+ verify(executorService).shutdownNow();
+ verify(oracleSnapshot).close();
+ assertTrue(reader.isDestroyed());
+ }
+
+ /**
+ * Test cases for {@link OracleReader#finishRead()}.
+ */
+ @Test
+ public void testFinishRead() throws Exception {
+ assertFalse(reader.isFinished());
+ reader.finishRead();
+ assertTrue(reader.isFinished());
+ }
+
+ /**
+ * Test cases for {@link OracleReader#isSourceExist()}.
+ */
+ @Test
+ public void testIsSourceExist() {
+ assertTrue(reader.isSourceExist());
+ }
+
+ /**
+ * Test cases for {@link OracleReader#getSnapshot()}.
+ */
+ @Test
+ public void testGetSnapshot() {
+ final String snapShort = "snapShort";
+ when(oracleSnapshot.getSnapshot()).thenReturn(snapShort);
+ assertEquals(snapShort, reader.getSnapshot());
+ }
+
+ /**
+ * Test cases for {@link OracleReader#getReadSource()}.
+ */
+ @Test
+ public void testGetReadSource() {
+ assertEquals(instanceId, reader.getReadSource());
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java
new file mode 100644
index 000000000..160357e77
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.common.metric.MetricItem;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.powermock.api.support.membermodification.MemberMatcher.field;
+
+/**
+ * Test cases for {@link OracleSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({OracleSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestOracleSource {
+
+ @Mock
+ JobProfile jobProfile;
+
+ @Mock
+ private AgentMetricItemSet agentMetricItemSet;
+
+ @Mock
+ private AgentMetricItem agentMetricItem;
+
+ private AtomicLong sourceSuccessCount;
+
+ private AtomicLong sourceFailCount;
+
+ @Before
+ public void setup() throws Exception {
+ sourceSuccessCount = new AtomicLong(0);
+ sourceFailCount = new AtomicLong(0);
+
+ // mock metrics
+ whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
+ when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
+ field(AgentMetricItem.class, "sourceSuccessCount").set(agentMetricItem, sourceSuccessCount);
+ field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem, sourceFailCount);
+ PowerMockito.mockStatic(MetricRegister.class);
+ PowerMockito.doNothing().when(
+ MetricRegister.class, "register", any(MetricItem.class));
+ }
+
+ /**
+ * Test cases for {@link OracleSource#split(JobProfile)}.
+ */
+ @Test
+ public void testSplit() {
+
+ // build mock
+ final OracleSource source = new OracleSource();
+ // assert
+ assertEquals(1, source.split(jobProfile).size());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 08785dc82..7f0f54c35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -623,6 +623,13 @@
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-oracle</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>