You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/09 10:35:22 UTC

[inlong] branch master updated: [INLONG-6176][Agent] Support collect data from Oracle (#6203)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cc3da3797 [INLONG-6176][Agent] Support collect data from Oracle (#6203)
cc3da3797 is described below

commit cc3da379709a767e277a756500371aa2333946bd
Author: haibo.duan <dh...@live.cn>
AuthorDate: Wed Nov 9 18:35:16 2022 +0800

    [INLONG-6176][Agent] Support collect data from Oracle (#6203)
---
 .../inlong/agent/constant/OracleConstants.java     |  50 ++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  53 +-
 .../org/apache/inlong/agent/pojo/OracleJob.java    |  75 +++
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../inlong/agent/plugin/sources/OracleSource.java  |  48 ++
 .../plugin/sources/reader/AbstractReader.java      |  35 ++
 .../agent/plugin/sources/reader/BinlogReader.java  |  32 +-
 .../agent/plugin/sources/reader/MongoDBReader.java |  32 +-
 .../{SQLServerReader.java => OracleReader.java}    | 634 ++++++++++-----------
 .../plugin/sources/reader/PostgreSQLReader.java    |  32 +-
 .../plugin/sources/reader/SQLServerReader.java     |  34 +-
 .../sources/snapshot/OracleSnapshotBase.java       |  52 ++
 .../agent/plugin/sources/TestOracleConnect.java    |  62 ++
 .../agent/plugin/sources/TestOracleReader.java     | 234 ++++++++
 .../agent/plugin/sources/TestOracleSource.java     |  90 +++
 pom.xml                                            |   7 +
 16 files changed, 1037 insertions(+), 438 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
new file mode 100644
index 000000000..6ba84a7c3
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class OracleConstants {
+
+    /**
+     * The snapshot includes the structure and data of the captured tables.
+     * Specify this value to populate topics with a complete representation of the data from the captured tables.
+     */
+    public static final String INITIAL = "initial";
+
+    /**
+     * The snapshot includes the structure and data of the captured tables.
+     * The connector performs an initial snapshot and then stops, without processing any subsequent changes.
+     */
+    public static final String INITIAL_ONLY = "initial_only";
+
+    /**
+     * The snapshot includes only the structure of captured tables.
+     * Specify this value if you want the connector to capture data only for changes that occur after the snapshot.
+     */
+    public static final String SCHEMA_ONLY = "schema_only";
+
+    /**
+     * This is a recovery setting for a connector that has already been capturing changes.
+     * When you restart the connector, this setting enables recovery of a corrupted or lost database history topic.
+     * You might set it periodically to "clean up" a database history topic that has been growing unexpectedly.
+     * Database history topics require infinite retention. Note this mode is only safe to be used when it is guaranteed
+     * that no schema changes happened since the point in time the connector was shut down before and the point in time
+     * the snapshot is taken.
+     */
+    public static final String SCHEMA_ONLY_RECOVERY = "schema_only_recovery";
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 810a42d2b..f967dd1a8 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -54,6 +54,10 @@ public class JobProfileDto {
      * mongo source
      */
     public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource";
+    /**
+     * oracle source
+     */
+    public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource";
     /**
      * mqtt source
      */
@@ -230,10 +234,10 @@ public class JobProfileDto {
         return mongoJob;
     }
 
-    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
-        SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
-                SqlServerJob.SqlserverJobConfig.class);
-        SqlServerJob oracleJob = new SqlServerJob();
+    private static OracleJob getOracleJob(DataConfig dataConfigs) {
+        OracleJob.OracleJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                OracleJob.OracleJobConfig.class);
+        OracleJob oracleJob = new OracleJob();
         oracleJob.setUser(config.getUser());
         oracleJob.setHostname(config.getHostname());
         oracleJob.setPassword(config.getPassword());
@@ -241,23 +245,51 @@ public class JobProfileDto {
         oracleJob.setServerName(config.getServerName());
         oracleJob.setDbname(config.getDbname());
 
-        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        OracleJob.Offset offset = new OracleJob.Offset();
         offset.setFilename(config.getOffsetFilename());
         offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
         offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
         oracleJob.setOffset(offset);
 
-        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        OracleJob.Snapshot snapshot = new OracleJob.Snapshot();
         snapshot.setMode(config.getSnapshotMode());
         oracleJob.setSnapshot(snapshot);
 
-        SqlServerJob.History history = new SqlServerJob.History();
+        OracleJob.History history = new OracleJob.History();
         history.setFilename(config.getHistoryFilename());
         oracleJob.setHistory(history);
 
         return oracleJob;
     }
 
+    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+        SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                SqlServerJob.SqlserverJobConfig.class);
+        SqlServerJob sqlServerJob = new SqlServerJob();
+        sqlServerJob.setUser(config.getUser());
+        sqlServerJob.setHostname(config.getHostname());
+        sqlServerJob.setPassword(config.getPassword());
+        sqlServerJob.setPort(config.getPort());
+        sqlServerJob.setServerName(config.getServerName());
+        sqlServerJob.setDbname(config.getDbname());
+
+        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        sqlServerJob.setOffset(offset);
+
+        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        sqlServerJob.setSnapshot(snapshot);
+
+        SqlServerJob.History history = new SqlServerJob.History();
+        history.setFilename(config.getHistoryFilename());
+        sqlServerJob.setHistory(history);
+
+        return sqlServerJob;
+    }
+
     public static MqttJob getMqttJob(DataConfig dataConfigs) {
         MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
                 MqttJob.MqttJobConfig.class);
@@ -341,6 +373,12 @@ public class JobProfileDto {
                 job.setSource(KAFKA_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case ORACLE:
+                OracleJob oracleJob = getOracleJob(dataConfig);
+                job.setOracleJob(oracleJob);
+                job.setSource(ORACLE_SOURCE);
+                profileDto.setJob(job);
+                break;
             case SQLSERVER:
                 SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
                 job.setSqlserverJob(sqlserverJob);
@@ -385,6 +423,7 @@ public class JobProfileDto {
         private FileJob fileJob;
         private BinlogJob binlogJob;
         private KafkaJob kafkaJob;
+        private OracleJob oracleJob;
         private MongoJob mongoJob;
         private MqttJob mqttJob;
         private SqlServerJob sqlserverJob;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java
new file mode 100644
index 000000000..ef2420b3a
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class OracleJob {
+
+    private String hostname;
+    private String user;
+    private String password;
+    private String port;
+    private String serverName;
+    private String dbname;
+
+    private OracleJob.Snapshot snapshot;
+    private OracleJob.Offset offset;
+    private OracleJob.History history;
+
+    @Data
+    public static class Offset {
+
+        private String intervalMs;
+        private String filename;
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+    @Data
+    public static class Snapshot {
+
+        private String mode;
+    }
+
+    @Data
+    public static class History {
+
+        private String filename;
+    }
+
+    @Data
+    public static class OracleJobConfig {
+
+        private String hostname;
+        private String user;
+        private String password;
+        private String port;
+        private String dbname;
+        private String serverName;
+
+        private String snapshotMode;
+        private String intervalMs;
+        private String offsetFilename;
+        private String historyFilename;
+
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index fa8e03a41..85520f508 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -85,6 +85,11 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-oracle</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-sqlserver</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
new file mode 100644
index 000000000..b78f18dac
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Oracle SQL source
+ */
+public class OracleSource extends AbstractSource {
+
+    private static final Logger logger = LoggerFactory.getLogger(OracleSource.class);
+
+    public OracleSource() {
+    }
+
+    @Override
+    public List<Reader> split(JobProfile conf) {
+        super.init(conf);
+        Reader oracleReader = new OracleReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(oracleReader);
+        sourceMetric.sourceSuccessCount.incrementAndGet();
+        return readerList;
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 5736f21b9..115070dc4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -21,8 +21,14 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.common.metric.MetricRegister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -49,6 +55,8 @@ public abstract class AbstractReader implements Reader {
     protected String metricName;
     protected Map<String, String> dimensions;
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractReader.class);
+
     @Override
     public void init(JobProfile jobConf) {
         inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
@@ -68,4 +76,31 @@ public abstract class AbstractReader implements Reader {
     public String getInlongGroupId() {
         return inlongGroupId;
     }
+
+    /**
+     * specific offsets
+     *
+     * @param server specific server
+     * @param file specific offset file
+     * @param pos specific offset pos
+     * @return
+     */
+    public String serializeOffset(final String server, final String file,
+            final String pos) {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        sourceOffset.put("file", file);
+        sourceOffset.put("pos", pos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", server);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
+        try {
+            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
+        }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
+    }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 38877951b..fa61c0a10 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -35,14 +35,11 @@ import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -215,8 +212,13 @@ public class BinlogReader extends AbstractReader {
         props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         props.setProperty("database.history.file.filename", databaseStoreHistoryName);
         if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
             props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
@@ -235,28 +237,6 @@ public class BinlogReader extends AbstractReader {
         return props;
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     @Override
     public void destroy() {
         synchronized (this) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index 3abe037bb..eee9e4bea 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -35,15 +35,12 @@ import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.agent.utils.GsonUtil;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
@@ -305,8 +302,13 @@ public class MongoDBReader extends AbstractReader {
 
         String snapshotMode = props.getOrDefault(JOB_MONGO_SNAPSHOT_MODE, "").toString();
         if (Objects.equals(SnapshotModeConstants.INITIAL, snapshotMode)) {
+            Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
         }
@@ -325,28 +327,6 @@ public class MongoDBReader extends AbstractReader {
         builder.with(field, value);
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     /**
      * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)}
      * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished.
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
similarity index 72%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
index 079d9af07..bfc80542a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
@@ -1,326 +1,308 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
-import io.debezium.connector.sqlserver.SqlServerConnector;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import io.debezium.relational.history.FileDatabaseHistory;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.SnapshotModeConstants;
-import org.apache.inlong.agent.constant.SqlServerConstants;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
-import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
-import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
-import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
-import org.apache.inlong.agent.utils.GsonUtil;
-import org.apache.kafka.connect.storage.FileOffsetBackingStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-
-/**
- * Read data from SQLServer database by Debezium
- */
-public class SQLServerReader extends AbstractReader {
-
-    public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
-    public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
-    public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
-    public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
-    public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
-    public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
-    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode";
-    public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize";
-    public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets";
-    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile";
-    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos";
-
-    public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName";
-
-    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
-    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
-    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
-
-    private static final Gson GSON = new Gson();
-
-    private String databaseStoreHistoryName;
-    private String instanceId;
-    private String dbName;
-    private String serverName;
-    private String userName;
-    private String password;
-    private String hostName;
-    private String port;
-    private String offsetFlushIntervalMs;
-    private String offsetStoreFileName;
-    private String snapshotMode;
-    private String offset;
-    private String specificOffsetFile;
-    private String specificOffsetPos;
-
-    private ExecutorService executor;
-    private SqlServerSnapshotBase sqlServerSnapshot;
-    private boolean finished = false;
-
-    private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
-    private JobProfile jobProfile;
-    private boolean destroyed = false;
-
-    public SQLServerReader() {
-
-    }
-
-    @Override
-    public Message read() {
-        if (!sqlServerMessageQueue.isEmpty()) {
-            return getSqlServerMessage();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * poll message from buffer pool
-     *
-     * @return org.apache.inlong.agent.plugin.Message
-     */
-    private DefaultMessage getSqlServerMessage() {
-        // Retrieves and removes the head of this queue,
-        // or returns null if this queue is empty.
-        Pair<String, String> message = sqlServerMessageQueue.poll();
-        if (Objects.isNull(message)) {
-            return null;
-        }
-        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
-        header.put(PROXY_KEY_DATA, message.getKey());
-        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
-    }
-
-    public boolean isDestroyed() {
-        return destroyed;
-    }
-
-    @Override
-    public boolean isFinished() {
-        return finished;
-    }
-
-    @Override
-    public String getReadSource() {
-        return instanceId;
-    }
-
-    @Override
-    public void setReadTimeout(long mill) {
-
-    }
-
-    @Override
-    public void setWaitMillisecond(long millis) {
-
-    }
-
-    @Override
-    public String getSnapshot() {
-        if (sqlServerSnapshot != null) {
-            return sqlServerSnapshot.getSnapshot();
-        } else {
-            return StringUtils.EMPTY;
-        }
-    }
-
-    @Override
-    public void finishRead() {
-        this.finished = true;
-    }
-
-    @Override
-    public boolean isSourceExist() {
-        return true;
-    }
-
-    private String tryToInitAndGetHistoryPath() {
-        String historyPath = agentConf.get(
-                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
-        String parentPath = agentConf.get(
-                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
-        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
-    }
-
-    @Override
-    public void init(JobProfile jobConf) {
-        super.init(jobConf);
-        jobProfile = jobConf;
-        LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr());
-        userName = jobConf.get(JOB_DATABASE_USER);
-        password = jobConf.get(JOB_DATABASE_PASSWORD);
-        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
-        port = jobConf.get(JOB_DATABASE_PORT);
-        dbName = jobConf.get(JOB_DATABASE_DBNAME);
-        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
-        instanceId = jobConf.getInstanceId();
-        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
-        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
-                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
-        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL);
-        sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
-        finished = false;
-
-        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
-                tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
-        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
-        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
-        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
-
-        sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName);
-        sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile());
-
-        Properties props = getEngineProps();
-
-        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
-                        io.debezium.engine.format.Json.class)
-                .using(props)
-                .notifying((records, committer) -> {
-                    try {
-                        for (ChangeEvent<String, String> record : records) {
-                            DebeziumFormat debeziumFormat = GSON
-                                    .fromJson(record.value(), DebeziumFormat.class);
-                            sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
-                            committer.markProcessed(record);
-                        }
-                        committer.markBatchFinished();
-                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
-                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                                System.currentTimeMillis(), records.size(), dataSize);
-                        readerMetric.pluginReadSuccessCount.addAndGet(records.size());
-                        readerMetric.pluginReadCount.addAndGet(records.size());
-                    } catch (Exception e) {
-                        readerMetric.pluginReadFailCount.addAndGet(records.size());
-                        readerMetric.pluginReadCount.addAndGet(records.size());
-                        LOGGER.error("parse SqlServer message error", e);
-                    }
-                })
-                .using((success, message, error) -> {
-                    if (!success) {
-                        LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error);
-                    }
-                }).build();
-
-        executor = Executors.newSingleThreadExecutor();
-        executor.execute(engine);
-
-        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
-    }
-
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
-    private Properties getEngineProps() {
-        Properties props = new Properties();
-        props.setProperty("name", "engine" + instanceId);
-        props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
-        props.setProperty("database.hostname", hostName);
-        props.setProperty("database.port", port);
-        props.setProperty("database.user", userName);
-        props.setProperty("database.password", password);
-        props.setProperty("database.dbname", dbName);
-        props.setProperty("database.server.name", serverName);
-        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
-        props.setProperty("database.snapshot.mode", snapshotMode);
-        props.setProperty("key.converter.schemas.enable", "false");
-        props.setProperty("value.converter.schemas.enable", "false");
-        props.setProperty("snapshot.mode", snapshotMode);
-        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
-        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
-        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
-            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
-            props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
-        } else {
-            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
-        }
-        props.setProperty("tombstones.on.delete", "false");
-        props.setProperty("converters", "datetime");
-        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
-        props.setProperty("datetime.format.date", "yyyy-MM-dd");
-        props.setProperty("datetime.format.time", "HH:mm:ss");
-        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
-        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
-
-        LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props);
-        return props;
-    }
-
-    @Override
-    public void destroy() {
-        synchronized (this) {
-            if (!destroyed) {
-                this.executor.shutdownNow();
-                this.sqlServerSnapshot.close();
-                this.destroyed = true;
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.oracle.OracleConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.OracleConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+
+/**
+ * Read data from Oracle database by Debezium
+ */
+public class OracleReader extends AbstractReader {
+
+    public static final String ORACLE_READER_TAG_NAME = "AgentOracleMetric";
+    public static final String JOB_DATABASE_USER = "job.oracleJob.user";
+    public static final String JOB_DATABASE_PASSWORD = "job.oracleJob.password";
+    public static final String JOB_DATABASE_HOSTNAME = "job.oracleJob.hostname";
+    public static final String JOB_DATABASE_PORT = "job.oracleJob.port";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.oracleJob.snapshot.mode";
+    public static final String JOB_DATABASE_SERVER_NAME = "job.oracleJob.serverName";
+    public static final String JOB_DATABASE_QUEUE_SIZE = "job.oracleJob.queueSize";
+    public static final String JOB_DATABASE_OFFSETS = "job.oracleJob.offsets";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.oracleJob.offset.specificOffsetFile";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.oracleJob.offset.specificOffsetPos";
+    public static final String JOB_DATABASE_DBNAME = "job.oracleJob.dbname";
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.oracleJob.offset.intervalMs";
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.oracleJob.history.filename";
+
+    private static final Gson GSON = new Gson();
+    public static final String ORACLE = "oracle";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+
+    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+
+    private String databaseStoreHistoryName;
+    private String instanceId;
+    private String dbName;
+    private String serverName;
+    private String userName;
+    private String password;
+    private String hostName;
+    private String port;
+    private String offsetFlushIntervalMs;
+    private String offsetStoreFileName;
+    private String snapshotMode;
+    private String offset;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
+    private OracleSnapshotBase oracleSnapshot;
+    private boolean finished = false;
+    private ExecutorService executor;
+
+    /**
+     * pair.left : table name
+     * pair.right : actual data
+     */
+    private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue;
+    private JobProfile jobProfile;
+    private boolean destroyed = false;
+
+    public OracleReader() {
+    }
+
+    @Override
+    public Message read() {
+        if (!oracleMessageQueue.isEmpty()) {
+            return getOracleMessage();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * poll message from buffer pool
+     *
+     * @return org.apache.inlong.agent.plugin.Message
+     */
+    private DefaultMessage getOracleMessage() {
+        // Retrieves and removes the head of this queue,
+        // or returns null if this queue is empty.
+        Pair<String, String> message = oracleMessageQueue.poll();
+        if (Objects.isNull(message)) {
+            return null;
+        }
+        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+        header.put(PROXY_KEY_DATA, message.getKey());
+        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
+    }
+
+    public boolean isDestroyed() {
+        return destroyed;
+    }
+
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public String getReadSource() {
+        return instanceId;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+
+    }
+
+    @Override
+    public void setWaitMillisecond(long millis) {
+
+    }
+
+    @Override
+    public String getSnapshot() {
+        if (oracleSnapshot != null) {
+            return oracleSnapshot.getSnapshot();
+        } else {
+            return StringUtils.EMPTY;
+        }
+    }
+
+    @Override
+    public void finishRead() {
+        this.finished = true;
+    }
+
+    @Override
+    public boolean isSourceExist() {
+        return true;
+    }
+
+    private String tryToInitAndGetHistoryPath() {
+        String historyPath = agentConf.get(
+                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+        String parentPath = agentConf.get(
+                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+    }
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        jobProfile = jobConf;
+        LOGGER.info("init oracle reader with jobConf {}", jobConf.toJsonStr());
+        userName = jobConf.get(JOB_DATABASE_USER);
+        password = jobConf.get(JOB_DATABASE_PASSWORD);
+        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+        port = jobConf.get(JOB_DATABASE_PORT);
+        dbName = jobConf.get(JOB_DATABASE_DBNAME);
+        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+        instanceId = jobConf.getInstanceId();
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, OracleConstants.INITIAL);
+        oracleMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
+        finished = false;
+
+        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
+        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+        oracleSnapshot = new OracleSnapshotBase(offsetStoreFileName);
+        oracleSnapshot.save(offset, oracleSnapshot.getFile());
+
+        Properties props = getEngineProps();
+
+        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+                        io.debezium.engine.format.Json.class)
+                .using(props)
+                .notifying((records, committer) -> {
+                    try {
+                        for (ChangeEvent<String, String> record : records) {
+                            DebeziumFormat debeziumFormat = GSON
+                                    .fromJson(record.value(), DebeziumFormat.class);
+                            oracleMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+                            committer.markProcessed(record);
+                        }
+                        committer.markBatchFinished();
+                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), records.size(), dataSize);
+                        readerMetric.pluginReadSuccessCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                    } catch (Exception e) {
+                        readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                        LOGGER.error("parse binlog message error", e);
+                    }
+                })
+                .using((success, message, error) -> {
+                    if (!success) {
+                        LOGGER.error("oracle job with jobConf {} has error {}", instanceId, message, error);
+                    }
+                }).build();
+
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(engine);
+
+        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+    }
+
+    private Properties getEngineProps() {
+        Properties props = new Properties();
+        props.setProperty("name", "engine" + instanceId);
+        props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
+        props.setProperty("database.hostname", hostName);
+        props.setProperty("database.port", port);
+        props.setProperty("database.user", userName);
+        props.setProperty("database.password", password);
+        props.setProperty("database.dbname", dbName);
+        props.setProperty("database.server.name", serverName);
+        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+        props.setProperty("database.snapshot.mode", snapshotMode);
+        props.setProperty("key.converter.schemas.enable", "false");
+        props.setProperty("value.converter.schemas.enable", "false");
+        props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
+            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
+            props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
+        } else {
+            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
+        }
+        props.setProperty("tombstones.on.delete", "false");
+        props.setProperty("converters", "datetime");
+        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+        props.setProperty("datetime.format.date", "yyyy-MM-dd");
+        props.setProperty("datetime.format.time", "HH:mm:ss");
+        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+        LOGGER.info("oracle job {} start with props {}", jobProfile.getInstanceId(), props);
+        return props;
+    }
+
+    @Override
+    public void destroy() {
+        synchronized (this) {
+            if (!destroyed) {
+                this.executor.shutdownNow();
+                this.oracleSnapshot.close();
+                this.destroyed = true;
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index 23bfa1fbe..38f02e277 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -34,14 +34,11 @@ import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -212,8 +209,13 @@ public class PostgreSQLReader extends AbstractReader {
         props.setProperty("snapshot.mode", snapshotMode);
         props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
         }
@@ -229,28 +231,6 @@ public class PostgreSQLReader extends AbstractReader {
         return props;
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     @Override
     public void destroy() {
         synchronized (this) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 079d9af07..3a6eeafcc 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -37,15 +37,12 @@ import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.agent.utils.GsonUtil;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -80,7 +77,7 @@ public class SQLServerReader extends AbstractReader {
     public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
     public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerReader.class);
     private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
 
     private static final Gson GSON = new Gson();
@@ -254,28 +251,6 @@ public class SQLServerReader extends AbstractReader {
         LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     private Properties getEngineProps() {
         Properties props = new Properties();
         props.setProperty("name", "engine" + instanceId);
@@ -294,8 +269,13 @@ public class SQLServerReader extends AbstractReader {
         props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         props.setProperty("database.history.file.filename", databaseStoreHistoryName);
         if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
             props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java
new file mode 100644
index 000000000..4c19c3eea
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * Oracle Snapshot
+ */
+public class OracleSnapshotBase extends AbstractSnapshot {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotBase.class);
+    private final File file;
+
+    public OracleSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        byte[] offset = this.load(this.file);
+        return ENCODER.encodeToString(offset);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
new file mode 100644
index 000000000..f577ed5db
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
+/**
+ * Test cases for {@link OracleReader}.
+ */
+public class TestOracleConnect {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestOracleConnect.class);
+
+    @Ignore
+    public void testOracle() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set("job.oracleJob.hostname", "localhost");
+        jobProfile.set("job.oracleJob.port", "1521");
+        jobProfile.set("job.oracleJob.user", "c##dbzuser");
+        jobProfile.set("job.oracleJob.password", "dbz");
+        jobProfile.set("job.oracleJob.sid", "ORCLCDB");
+        jobProfile.set("job.oracleJob.dbname", "ORCLCDB");
+        jobProfile.set("job.oracleJob.serverName", "server1");
+        jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+        OracleReader oracleReader = new OracleReader();
+        oracleReader.init(jobProfile);
+        while (true) {
+            Message message = oracleReader.read();
+            if (message != null) {
+                LOGGER.info("event content: {}", message);
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java
new file mode 100644
index 000000000..db24e3acc
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.OracleConstants;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.field;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Test cases for {@link OracleReader}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, OracleReader.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestOracleReader {
+
+    private OracleReader reader;
+
+    @Mock
+    private JobProfile jobProfile;
+
+    @Mock
+    private AgentMetricItemSet agentMetricItemSet;
+
+    @Mock
+    private AgentMetricItem agentMetricItem;
+
+    @Mock
+    private OracleSnapshotBase oracleSnapshot;
+
+    @Mock
+    private DebeziumEngine.Builder builder;
+
+    @Mock
+    private ExecutorService executorService;
+
+    @Mock
+    private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue;
+
+    @Mock
+    private DebeziumEngine<ChangeEvent<String, String>> engine;
+
+    private AtomicLong atomicLong;
+
+    private AtomicLong atomicCountLong;
+
+    private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
+
+    @Before
+    public void setUp() throws Exception {
+        final String username = "sa";
+        final String password = "123456";
+        final String hostname = "127.0.0.1";
+        final String port = "1434";
+        final String groupId = "group01";
+        final String streamId = "stream01";
+        final String dbName = "testdb";
+        final String serverName = "serverName";
+        final String offsetFlushIntervalMs = "1000";
+        final String offsetStoreFileName = "/opt/offset.dat";
+        final String snapshotMode = OracleConstants.INITIAL;
+        final int queueSize = 1000;
+        final String databaseStoreHistoryName = "/opt/history.dat";
+        final String offset = "111";
+        final String specificOffsetFile = "";
+        final String specificOffsetPos = "-1";
+
+        atomicLong = new AtomicLong(0L);
+        atomicCountLong = new AtomicLong(0L);
+
+        when(jobProfile.getInstanceId()).thenReturn(instanceId);
+        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId);
+        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_USER))).thenReturn(username);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PASSWORD))).thenReturn(password);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PORT))).thenReturn(port);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_DBNAME))).thenReturn(dbName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn(
+                offsetFlushIntervalMs);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn(
+                offsetStoreFileName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode);
+        when(jobProfile.getInt(eq(OracleReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(databaseStoreHistoryName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn(
+                specificOffsetFile);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn(
+                specificOffsetPos);
+        whenNew(OracleSnapshotBase.class).withAnyArguments().thenReturn(oracleSnapshot);
+
+        //mock oracleMessageQueue
+        whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(oracleMessageQueue);
+
+        //mock DebeziumEngine
+        mockStatic(DebeziumEngine.class);
+        when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+        when(builder.using(any(Properties.class))).thenReturn(builder);
+        when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+        when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+        when(builder.build()).thenReturn(engine);
+
+        //mock executorService
+        mockStatic(Executors.class);
+        when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
+
+        //mock metrics
+        whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
+        when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
+        field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem, atomicLong);
+        field(AgentMetricItem.class, "pluginReadSuccessCount").set(agentMetricItem, atomicCountLong);
+
+        //init method
+        mockStatic(MetricRegister.class);
+        (reader = new OracleReader()).init(jobProfile);
+    }
+
+    /**
+     * Test cases for {@link OracleReader#read()}.
+     */
+    @Test
+    public void testRead() throws Exception {
+        final String right = "value";
+        final String left = "key";
+        final String dataKey = "dataKey";
+        when(oracleMessageQueue.isEmpty()).thenReturn(true);
+        assertEquals(null, reader.read());
+        when(oracleMessageQueue.isEmpty()).thenReturn(false);
+        when(oracleMessageQueue.poll()).thenReturn(Pair.of(left, right));
+        Message result = reader.read();
+        assertEquals(String.join(right, "\"", "\""), result.toString());
+        assertEquals(left, result.getHeader().get(dataKey));
+    }
+
+    /**
+     * Test cases for {@link OracleReader#destroy()}.
+     */
+    @Test
+    public void testDestroy() throws Exception {
+        assertFalse(reader.isDestroyed());
+        reader.destroy();
+        verify(executorService).shutdownNow();
+        verify(oracleSnapshot).close();
+        assertTrue(reader.isDestroyed());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#finishRead()}.
+     */
+    @Test
+    public void testFinishRead() throws Exception {
+        assertFalse(reader.isFinished());
+        reader.finishRead();
+        assertTrue(reader.isFinished());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#isSourceExist()}.
+     */
+    @Test
+    public void testIsSourceExist() {
+        assertTrue(reader.isSourceExist());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#getSnapshot()}.
+     */
+    @Test
+    public void testGetSnapshot() {
+        final String snapShort = "snapShort";
+        when(oracleSnapshot.getSnapshot()).thenReturn(snapShort);
+        assertEquals(snapShort, reader.getSnapshot());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#getReadSource()}.
+     */
+    @Test
+    public void testGetReadSource() {
+        assertEquals(instanceId, reader.getReadSource());
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java
new file mode 100644
index 000000000..160357e77
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.common.metric.MetricItem;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.powermock.api.support.membermodification.MemberMatcher.field;
+
+/**
+ * Test cases for {@link OracleSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({OracleSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestOracleSource {
+
+    @Mock
+    JobProfile jobProfile;
+
+    @Mock
+    private AgentMetricItemSet agentMetricItemSet;
+
+    @Mock
+    private AgentMetricItem agentMetricItem;
+
+    private AtomicLong sourceSuccessCount;
+
+    private AtomicLong sourceFailCount;
+
+    @Before
+    public void setup() throws Exception {
+        sourceSuccessCount = new AtomicLong(0);
+        sourceFailCount = new AtomicLong(0);
+
+        // mock metrics
+        whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
+        when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
+        field(AgentMetricItem.class, "sourceSuccessCount").set(agentMetricItem, sourceSuccessCount);
+        field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem, sourceFailCount);
+        PowerMockito.mockStatic(MetricRegister.class);
+        PowerMockito.doNothing().when(
+                MetricRegister.class, "register", any(MetricItem.class));
+    }
+
+    /**
+     * Test cases for {@link OracleSource#split(JobProfile)}.
+     */
+    @Test
+    public void testSplit() {
+
+        // build mock
+        final OracleSource source = new OracleSource();
+        // assert
+        assertEquals(1, source.split(jobProfile).size());
+    }
+}
diff --git a/pom.xml b/pom.xml
index 08785dc82..7f0f54c35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -623,6 +623,13 @@
                 <artifactId>debezium-connector-postgres</artifactId>
                 <version>${debezium.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-oracle</artifactId>
+                <version>${debezium.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.h2database</groupId>
                 <artifactId>h2</artifactId>