You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/09 11:25:30 UTC

[inlong] branch branch-1.4 updated (a39c8fd9b -> 287644e7c)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from a39c8fd9b [INLONG-6379][Sort] Complement iceberg multiple sink metric data compute (#6472)
     new ec6d0ca59 [INLONG-6475][Docker] Add a base flink environment for docker-compose (#6476)
     new f75539106 [INLONG-6477][Manager] Add consume API in the manager client (#6480)
     new a5aa18265 [INLONG-6470][Dashboard] Supports management of PostgreSQL source (#6478)
     new 1d298a1b8 [INLONG-6482][Dashboard] Sink management distinguishes between save-only and save-and-submit processes (#6483)
     new d4d737167 [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)
     new 16dfc8afc [INLONG-6176][Agent] Support collect data from Oracle (#6203)
     new 287644e7c [INLONG-6406][DataProxy] Should support creating sink dynamically after started (addendum) (#6488)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/docker-compose/docker-compose.yml           |  25 +-
 .../inlong/agent/constant/OracleConstants.java     |  50 ++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  53 +-
 .../pojo/{SqlServerJob.java => OracleJob.java}     |  11 +-
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../{SQLServerSource.java => OracleSource.java}    |  96 ++--
 .../plugin/sources/reader/AbstractReader.java      |  35 ++
 .../agent/plugin/sources/reader/BinlogReader.java  |  32 +-
 .../agent/plugin/sources/reader/MongoDBReader.java |  32 +-
 .../{SQLServerReader.java => OracleReader.java}    | 634 ++++++++++-----------
 .../plugin/sources/reader/PostgreSQLReader.java    |  32 +-
 .../plugin/sources/reader/SQLServerReader.java     |  34 +-
 ...erSnapshotBase.java => OracleSnapshotBase.java} |   8 +-
 ...QLServerConnect.java => TestOracleConnect.java} | 127 ++---
 ...tSQLServerReader.java => TestOracleReader.java} |  83 ++-
 ...tSQLServerSource.java => TestOracleSource.java} | 180 +++---
 inlong-dashboard/src/locales/cn.json               |  14 +-
 inlong-dashboard/src/locales/en.json               |  12 +
 .../src/metas/groups/defaults/index.ts             |   2 +-
 .../sources/defaults/{Oracle.ts => PostgreSQL.ts}  |  95 ++-
 .../src/metas/sources/defaults/index.ts            |   5 +
 .../pages/GroupDetail/DataStorage/DetailModal.tsx  |  24 +-
 .../src/pages/GroupDetail/DataStorage/index.tsx    |   1 +
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |   5 +-
 .../org/apache/inlong/dataproxy/sink/TubeSink.java |  20 +-
 .../dataproxy/sink/common/TubeProducerHolder.java  |   6 +-
 .../inlong/manager/client/api/InlongConsume.java}  |  60 +-
 .../manager/client/api/impl/InlongConsumeImpl.java |  89 +++
 .../client/api/inner/client/ClientFactory.java     |   2 +
 .../api/inner/client/InlongConsumeClient.java      | 147 +++++
 .../{DataNodeApi.java => InlongConsumeApi.java}    |  39 +-
 .../client/api/inner/ClientFactoryTest.java        |   3 +
 .../client/api/inner/InlongConsumeClientTest.java  | 160 ++++++
 .../queue/pulsar/PulsarResourceOperator.java       | 163 +++---
 .../web/controller/InlongConsumeController.java    |   5 +-
 pom.xml                                            |   7 +
 36 files changed, 1394 insertions(+), 902 deletions(-)
 create mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
 copy inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/{SqlServerJob.java => OracleJob.java} (90%)
 copy inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/{SQLServerSource.java => OracleSource.java} (80%)
 copy inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/{SQLServerReader.java => OracleReader.java} (72%)
 copy inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/{SqlServerSnapshotBase.java => OracleSnapshotBase.java} (88%)
 copy inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/{TestSQLServerConnect.java => TestOracleConnect.java} (68%)
 copy inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/{TestSQLServerReader.java => TestOracleReader.java} (68%)
 copy inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/{TestSQLServerSource.java => TestOracleSource.java} (90%)
 copy inlong-dashboard/src/metas/sources/defaults/{Oracle.ts => PostgreSQL.ts} (71%)
 copy inlong-manager/{manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java => manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java} (59%)
 create mode 100644 inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java
 create mode 100644 inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
 copy inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/{DataNodeApi.java => InlongConsumeApi.java} (53%)
 create mode 100644 inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java


[inlong] 05/07: [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d4d737167d2f5df013cb457939f2def6961c04c4
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Nov 9 17:48:51 2022 +0800

    [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)
---
 .../queue/pulsar/PulsarResourceOperator.java       | 163 ++++++++++++---------
 1 file changed, 95 insertions(+), 68 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 5b698b856..23c13b740 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -45,6 +44,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -84,42 +84,51 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         // get pulsar cluster via the inlong cluster tag from the inlong group
         String clusterTag = groupInfo.getInlongClusterTag();
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(clusterTag, null,
-                ClusterType.PULSAR);
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            // create pulsar tenant and namespace
-            String tenant = pulsarCluster.getTenant();
-            if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
-            }
-            String namespace = groupInfo.getMqResource();
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-            // if the group was not successful, need create tenant and namespace
-            if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
-                pulsarOperator.createTenant(pulsarAdmin, tenant);
-                log.info("success to create pulsar tenant for groupId={}, tenant={}", groupId, tenant);
-                pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
-                log.info("success to create pulsar namespace for groupId={}, namespace={}", groupId, namespace);
-            }
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+            try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+                String clusterName = pulsarCluster.getName();
+                // create pulsar tenant and namespace
+                String tenant = pulsarCluster.getTenant();
+                if (StringUtils.isEmpty(tenant)) {
+                    tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+                }
+                String namespace = groupInfo.getMqResource();
+                InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+                // if the group was not successful, need create tenant and namespace
+                if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
+                    pulsarOperator.createTenant(pulsarAdmin, tenant);
+                    log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}",
+                            groupId, tenant, clusterName);
+                    pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+                    log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
+                            groupId, namespace, clusterName);
+                }
 
-            // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
-                return;
+                // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
+                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+                if (streamInfoList == null || streamInfoList.isEmpty()) {
+                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
+                            groupId, clusterName);
+                    return;
+                }
+                // create pulsar topic and subscription
+                for (InlongStreamBriefInfo stream : streamInfoList) {
+                    this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
+                    this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(),
+                            stream.getInlongStreamId());
+                }
+            } catch (Exception e) {
+                String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
+                        pulsarCluster.toString());
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
-            // create pulsar topic and subscription
-            for (InlongStreamBriefInfo stream : streamInfoList) {
-                this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
-                this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), stream.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create pulsar resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
         }
-
-        log.info("success to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
+        log.info("success to create pulsar resource for groupId={}", groupId);
     }
 
     @Override
@@ -129,22 +138,28 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String groupId = groupInfo.getInlongGroupId();
         log.info("begin to delete pulsar resource for groupId={}", groupId);
 
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
+            String clusterName = clusterInfo.getName();
+            try {
+                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+                if (streamInfoList == null || streamInfoList.isEmpty()) {
+                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
+                            groupId, clusterName);
+                    return;
+                }
+                for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+                    this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+                }
+            } catch (Exception e) {
+                log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e);
+                throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
             }
-        } catch (Exception e) {
-            log.error("failed to delete pulsar resource for groupId=" + groupId, e);
-            throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
         }
-
-        log.info("success to delete pulsar resource for groupId={}, cluster={}", groupId, clusterInfo);
+        log.info("success to delete pulsar resource for groupId={}", groupId);
     }
 
     @Override
@@ -157,17 +172,22 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String streamId = streamInfo.getInlongStreamId();
         log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
 
-        try {
-            // get pulsar cluster via the inlong cluster tag from the inlong group
-            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
-                    groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-            // create pulsar topic and subscription
-            this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
-            this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource(), streamId);
-        } catch (Exception e) {
-            String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+            try {
+                // create pulsar topic and subscription
+                this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
+                this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster,
+                        streamInfo.getMqResource(), streamId);
+            } catch (Exception e) {
+                String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
+                        groupId, streamId,pulsarCluster.getName());
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
+            }
         }
 
         log.info("success to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
@@ -182,16 +202,23 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String streamId = streamInfo.getInlongStreamId();
         log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId);
 
-        try {
-            ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-            this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
-            log.info("success to delete pulsar topic for groupId={}, streamId={}", groupId, streamId);
-        } catch (Exception e) {
-            String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg);
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
+            String clusterName = clusterInfo.getName();
+            try {
+                this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+                log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}",
+                        groupId, streamId, clusterName);
+            } catch (Exception e) {
+                String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s",
+                        groupId, streamId, clusterName);
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg);
+            }
         }
-
         log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId);
     }
 


[inlong] 07/07: [INLONG-6406][DataProxy] Should support creating sink dynamically after started (addendum) (#6488)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 287644e7c8db46771621ca735467624a966c0058
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Nov 9 18:40:10 2022 +0800

    [INLONG-6406][DataProxy] Should support creating sink dynamically after started (addendum) (#6488)
---
 .../org/apache/inlong/dataproxy/sink/PulsarSink.java |  5 +----
 .../org/apache/inlong/dataproxy/sink/TubeSink.java   | 20 +++++++-------------
 .../dataproxy/sink/common/TubeProducerHolder.java    |  6 +++++-
 3 files changed, 13 insertions(+), 18 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 6fd938030..0e0330231 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -306,7 +306,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
 
         pulsarCluster = configManager.getMqClusterUrl2Token();
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
             ConfigManager.getInstance().updMqClusterStatus(true);
             logger.info("[{}] MQ Cluster service status ready!", getName());
         }
@@ -350,9 +349,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                         ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
         this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
         MetricRegister.register(metricItemSet);
-        if (ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
-        }
+        this.canTake = true;
         logger.info("[{}] Pulsar sink started", getName());
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index dee798214..e1dbf90a3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -76,7 +76,6 @@ public class TubeSink extends AbstractSink implements Configurable {
     private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
     private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
     private TubeProducerHolder producerHolder = null;
-    private volatile boolean canTake = false;
     private volatile boolean canSend = false;
     private volatile boolean isOverFlow = false;
     private ConfigManager configManager;
@@ -212,9 +211,6 @@ public class TubeSink extends AbstractSink implements Configurable {
         // start the cleaner thread
         super.start();
         this.canSend = true;
-        if (ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
-        }
         for (int i = 0; i < sinkThreadPool.length; i++) {
             sinkThreadPool[i] = new Thread(new TubeSinkTask(),
                     getName() + "_tube_sink_sender-" + i);
@@ -229,7 +225,6 @@ public class TubeSink extends AbstractSink implements Configurable {
             logger.info("Duplicated call, " + getName() + " has stopped!");
             return;
         }
-        this.canTake = false;
         // waiting inflight message processed
         int waitCount = 0;
         while (takenMsgCnt.get() > 0 && waitCount++ < 10) {
@@ -269,7 +264,7 @@ public class TubeSink extends AbstractSink implements Configurable {
 
     @Override
     public Status process() throws EventDeliveryException {
-        if (!this.canTake) {
+        if (!this.started.get()) {
             return Status.BACKOFF;
         }
         Status status = Status.READY;
@@ -319,6 +314,10 @@ public class TubeSink extends AbstractSink implements Configurable {
             logger.info("sink task {} started.", Thread.currentThread().getName());
             while (canSend) {
                 try {
+                    if (!started.get() && cachedMsgCnt.get() <= 0) {
+                        logger.info("Found started is false and taken message count is zero, braek!");
+                        break;
+                    }
                     if (isOverFlow) {
                         isOverFlow = false;
                         Thread.sleep(30);
@@ -327,10 +326,6 @@ public class TubeSink extends AbstractSink implements Configurable {
                     if (resendQueue.isEmpty()) {
                         event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
                         if (event == null) {
-                            if (!canTake && takenMsgCnt.get() <= 0) {
-                                logger.info("Found canTake is false and taken message count is zero, braek!");
-                                break;
-                            }
                             continue;
                         }
                         cachedMsgCnt.decrementAndGet();
@@ -534,7 +529,7 @@ public class TubeSink extends AbstractSink implements Configurable {
 
         @Override
         public void run() {
-            if (!canTake && takenMsgCnt.get() <= 0) {
+            if (!started.get()) {
                 return;
             }
             logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get()
@@ -621,7 +616,7 @@ public class TubeSink extends AbstractSink implements Configurable {
             if (producerHolder != null) {
                 try {
                     producerHolder.createProducersByTopicSet(addedTopics);
-                } catch (Exception e) {
+                } catch (Throwable e) {
                     logger.info(getName() + "'s publish new topic set fail.", e);
                 }
             }
@@ -675,7 +670,6 @@ public class TubeSink extends AbstractSink implements Configurable {
             tmpProducerHolder.stop();
         }
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
             ConfigManager.getInstance().updMqClusterStatus(true);
             logger.info("[{}] MQ Cluster service status ready!", getName());
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
index 04b43c87f..37f828d24 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
@@ -263,7 +263,11 @@ public class TubeProducerHolder {
                 lastProducer = sessionFactory.createProducer();
                 lastPubTopicCnt.set(0);
             }
-            lastProducer.publish(subTopicSet);
+            try {
+                lastProducer.publish(subTopicSet);
+            } catch (Throwable e) {
+                logger.info(sinkName + " meta sink publish fail.", e);
+            }
             lastPubTopicCnt.addAndGet(subTopicSet.size());
             for (String topicItem : subTopicSet) {
                 producerMap.put(topicItem, lastProducer);


[inlong] 01/07: [INLONG-6475][Docker] Add a base flink environment for docker-compose (#6476)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ec6d0ca59e7da23bbcae7e2fdf2191d41717aaec
Author: Charles Zhang <do...@apache.org>
AuthorDate: Wed Nov 9 17:31:17 2022 +0800

    [INLONG-6475][Docker] Add a base flink environment for docker-compose (#6476)
---
 docker/docker-compose/docker-compose.yml | 25 ++++++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git a/docker/docker-compose/docker-compose.yml b/docker/docker-compose/docker-compose.yml
index 8cce345f2..b3db40fcc 100644
--- a/docker/docker-compose/docker-compose.yml
+++ b/docker/docker-compose/docker-compose.yml
@@ -55,7 +55,7 @@ services:
       - USERNAME=root
       - PASSWORD=inlong
       - ZK_URL=tubemq-server:2181
-      - FLINK_HOST=localhost
+      - FLINK_HOST=jobmanager
       - FLINK_PORT=8081
       - AUDIT_PROXY_URL=audit:10081
 
@@ -114,3 +114,26 @@ services:
       - MQ_TYPE=pulsar
     ports:
       - "10081:10081"
+
+  # flink jobmanager
+  jobmanager:
+    image: apache/flink:1.13-scala_2.11
+    container_name: jobmanager
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: jobmanager
+    ports:
+      - "8081:8081"
+    command: jobmanager
+
+  # flink taskmanager
+  taskmanager:
+    image: apache/flink:1.13-scala_2.11
+    container_name: taskmanager
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: jobmanager
+        taskmanager.numberOfTaskSlots: 2
+    command: taskmanager
\ No newline at end of file


[inlong] 03/07: [INLONG-6470][Dashboard] Supports management of PostgreSQL source (#6478)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit a5aa182651e41a2d568a4b6fe8b55102d765f1f7
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Wed Nov 9 17:38:40 2022 +0800

    [INLONG-6470][Dashboard] Supports management of PostgreSQL source (#6478)
---
 inlong-dashboard/src/locales/cn.json               |  11 +-
 inlong-dashboard/src/locales/en.json               |   9 ++
 .../src/metas/sources/defaults/PostgreSQL.ts       | 154 +++++++++++++++++++++
 .../src/metas/sources/defaults/index.ts            |   5 +
 4 files changed, 178 insertions(+), 1 deletion(-)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index c5ad9c67a..e99371b9c 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -44,11 +44,20 @@
   "meta.Sources.Oracle.Username": "用户",
   "meta.Sources.Oracle.Password": "密码",
   "meta.Sources.Oracle.Database": "数据库名",
-  "meta.Sources.Oracle.SchemaName": "集合名称",
+  "meta.Sources.Oracle.SchemaName": "架构名称",
   "meta.Sources.Oracle.TableName": "表格名称",
   "meta.Sources.Oracle.AllMigration": "是否整库迁移",
   "meta.Sources.Oracle.ScanStartupMode": "扫描启动模式",
   "meta.Sources.Oracle.PrimaryKey": "主键",
+  "meta.Sources.PostgreSQL.Hostname": "服务器主机",
+  "meta.Sources.PostgreSQL.Port": "端口",
+  "meta.Sources.PostgreSQL.Username": "用户",
+  "meta.Sources.PostgreSQL.Password": "密码",
+  "meta.Sources.PostgreSQL.Database": "数据库名",
+  "meta.Sources.PostgreSQL.SchemaName": "架构名称",
+  "meta.Sources.PostgreSQL.TableName": "表格名称",
+  "meta.Sources.PostgreSQL.decodingPluginName": "解码插件名称",
+  "meta.Sources.PostgreSQL.PrimaryKey": "主键",
   "meta.Sinks.SinkName": "名称",
   "meta.Sinks.SinkNameRule": "以英文字母开头,只能包含英文字母、数字、中划线、下划线",
   "meta.Sinks.SinkType": "类型",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index dc7f103a1..255f56996 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -49,6 +49,15 @@
   "meta.Sources.Oracle.AllMigration": "AllMigration",
   "meta.Sources.Oracle.ScanStartupMode": "Scan startup mode",
   "meta.Sources.Oracle.PrimaryKey": "PrimaryKey",
+  "meta.Sources.PostgreSQL.Hostname": "Hostname",
+  "meta.Sources.PostgreSQL.Port": "Port",
+  "meta.Sources.PostgreSQL.Username": "Username",
+  "meta.Sources.PostgreSQL.Password": "Password",
+  "meta.Sources.PostgreSQL.Database": "Database",
+  "meta.Sources.PostgreSQL.SchemaName": "SchemaName",
+  "meta.Sources.PostgreSQL.TableName": "TableName",
+  "meta.Sources.PostgreSQL.decodingPluginName": "Decoding Plugin Name",
+  "meta.Sources.PostgreSQL.PrimaryKey": "PrimaryKey",
   "meta.Sinks.SinkName": "Name",
   "meta.Sinks.SinkNameRule": "At the beginning of English letters, only English letters, numbers, minus, and underscores",
   "meta.Sinks.SinkType": "Type",
diff --git a/inlong-dashboard/src/metas/sources/defaults/PostgreSQL.ts b/inlong-dashboard/src/metas/sources/defaults/PostgreSQL.ts
new file mode 100644
index 000000000..6bd69cc3f
--- /dev/null
+++ b/inlong-dashboard/src/metas/sources/defaults/PostgreSQL.ts
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { SourceInfo } from '../common/SourceInfo';
+
+const { I18n } = DataWithBackend;
+const { FieldDecorator } = RenderRow;
+const { ColumnDecorator } = RenderList;
+
+export default class PostgreSQLSource
+  extends SourceInfo
+  implements DataWithBackend, RenderRow, RenderList
+{
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.PostgreSQL.Hostname')
+  hostname: string;
+
+  @FieldDecorator({
+    type: 'inputnumber',
+    rules: [{ required: true }],
+    initialValue: 5432,
+    props: values => ({
+      min: 1,
+      max: 65535,
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.PostgreSQL.Port')
+  port: number;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.PostgreSQL.Database')
+  database: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.PostgreSQL.SchemaName')
+  schema: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.PostgreSQL.Username')
+  username: string;
+
+  @FieldDecorator({
+    type: 'password',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.PostgreSQL.Password')
+  password: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.PostgreSQL.TableName')
+  tableName: Record<string, unknown>;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.PostgreSQL.PrimaryKey')
+  primaryKey: string;
+
+  @FieldDecorator({
+    type: 'select',
+    initialValue: 'decoderbufs',
+    props: values => ({
+      disabled: values?.status === 101,
+      options: [
+        {
+          label: 'decoderbufs',
+          value: 'decoderbufs',
+        },
+        {
+          label: 'wal2json',
+          value: 'wal2json',
+        },
+        {
+          label: 'wal2json_rds',
+          value: 'wal2json_rds',
+        },
+        {
+          label: 'wal2json_streaming',
+          value: 'wal2json_streaming',
+        },
+        {
+          label: 'wal2json_rds_streaming',
+          value: 'wal2json_rds_streaming',
+        },
+        {
+          label: 'pgoutput',
+          value: 'pgoutput',
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Sources.PostgreSQL.decodingPluginName')
+  decodingPluginName: string;
+}
diff --git a/inlong-dashboard/src/metas/sources/defaults/index.ts b/inlong-dashboard/src/metas/sources/defaults/index.ts
index 881a7ff9a..15e30fcc7 100644
--- a/inlong-dashboard/src/metas/sources/defaults/index.ts
+++ b/inlong-dashboard/src/metas/sources/defaults/index.ts
@@ -51,4 +51,9 @@ export const allDefaultSources: MetaExportWithBackendList<SourceMetaType> = [
     value: 'ORACLE',
     LoadEntity: () => import('./Oracle'),
   },
+  {
+    label: 'PostgreSQL',
+    value: 'POSTGRESQL',
+    LoadEntity: () => import('./PostgreSQL'),
+  },
 ];


[inlong] 06/07: [INLONG-6176][Agent] Support collect data from Oracle (#6203)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 16dfc8afc748c4a0174a26984d67cd613f83def2
Author: haibo.duan <dh...@live.cn>
AuthorDate: Wed Nov 9 18:35:16 2022 +0800

    [INLONG-6176][Agent] Support collect data from Oracle (#6203)
---
 .../inlong/agent/constant/OracleConstants.java     |  50 ++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  53 +-
 .../org/apache/inlong/agent/pojo/OracleJob.java    |  75 +++
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../inlong/agent/plugin/sources/OracleSource.java  |  48 ++
 .../plugin/sources/reader/AbstractReader.java      |  35 ++
 .../agent/plugin/sources/reader/BinlogReader.java  |  32 +-
 .../agent/plugin/sources/reader/MongoDBReader.java |  32 +-
 .../{SQLServerReader.java => OracleReader.java}    | 634 ++++++++++-----------
 .../plugin/sources/reader/PostgreSQLReader.java    |  32 +-
 .../plugin/sources/reader/SQLServerReader.java     |  34 +-
 .../sources/snapshot/OracleSnapshotBase.java       |  52 ++
 .../agent/plugin/sources/TestOracleConnect.java    |  62 ++
 .../agent/plugin/sources/TestOracleReader.java     | 234 ++++++++
 .../agent/plugin/sources/TestOracleSource.java     |  90 +++
 pom.xml                                            |   7 +
 16 files changed, 1037 insertions(+), 438 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
new file mode 100644
index 000000000..6ba84a7c3
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class OracleConstants {
+
+    /**
+     * The snapshot includes the structure and data of the captured tables.
+     * Specify this value to populate topics with a complete representation of the data from the captured tables.
+     */
+    public static final String INITIAL = "initial";
+
+    /**
+     * The snapshot includes the structure and data of the captured tables.
+     * The connector performs an initial snapshot and then stops, without processing any subsequent changes.
+     */
+    public static final String INITIAL_ONLY = "initial_only";
+
+    /**
+     * The snapshot includes only the structure of captured tables.
+     * Specify this value if you want the connector to capture data only for changes that occur after the snapshot.
+     */
+    public static final String SCHEMA_ONLY = "schema_only";
+
+    /**
+     * This is a recovery setting for a connector that has already been capturing changes.
+     * When you restart the connector, this setting enables recovery of a corrupted or lost database history topic.
+     * You might set it periodically to "clean up" a database history topic that has been growing unexpectedly.
+     * Database history topics require infinite retention. Note this mode is only safe to be used when it is guaranteed
+     * that no schema changes happened since the point in time the connector was shut down before and the point in time
+     * the snapshot is taken.
+     */
+    public static final String SCHEMA_ONLY_RECOVERY = "schema_only_recovery";
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 810a42d2b..f967dd1a8 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -54,6 +54,10 @@ public class JobProfileDto {
      * mongo source
      */
     public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource";
+    /**
+     * oracle source
+     */
+    public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource";
     /**
      * mqtt source
      */
@@ -230,10 +234,10 @@ public class JobProfileDto {
         return mongoJob;
     }
 
-    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
-        SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
-                SqlServerJob.SqlserverJobConfig.class);
-        SqlServerJob oracleJob = new SqlServerJob();
+    private static OracleJob getOracleJob(DataConfig dataConfigs) {
+        OracleJob.OracleJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                OracleJob.OracleJobConfig.class);
+        OracleJob oracleJob = new OracleJob();
         oracleJob.setUser(config.getUser());
         oracleJob.setHostname(config.getHostname());
         oracleJob.setPassword(config.getPassword());
@@ -241,23 +245,51 @@ public class JobProfileDto {
         oracleJob.setServerName(config.getServerName());
         oracleJob.setDbname(config.getDbname());
 
-        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        OracleJob.Offset offset = new OracleJob.Offset();
         offset.setFilename(config.getOffsetFilename());
         offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
         offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
         oracleJob.setOffset(offset);
 
-        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        OracleJob.Snapshot snapshot = new OracleJob.Snapshot();
         snapshot.setMode(config.getSnapshotMode());
         oracleJob.setSnapshot(snapshot);
 
-        SqlServerJob.History history = new SqlServerJob.History();
+        OracleJob.History history = new OracleJob.History();
         history.setFilename(config.getHistoryFilename());
         oracleJob.setHistory(history);
 
         return oracleJob;
     }
 
+    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+        SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                SqlServerJob.SqlserverJobConfig.class);
+        SqlServerJob sqlServerJob = new SqlServerJob();
+        sqlServerJob.setUser(config.getUser());
+        sqlServerJob.setHostname(config.getHostname());
+        sqlServerJob.setPassword(config.getPassword());
+        sqlServerJob.setPort(config.getPort());
+        sqlServerJob.setServerName(config.getServerName());
+        sqlServerJob.setDbname(config.getDbname());
+
+        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        sqlServerJob.setOffset(offset);
+
+        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        sqlServerJob.setSnapshot(snapshot);
+
+        SqlServerJob.History history = new SqlServerJob.History();
+        history.setFilename(config.getHistoryFilename());
+        sqlServerJob.setHistory(history);
+
+        return sqlServerJob;
+    }
+
     public static MqttJob getMqttJob(DataConfig dataConfigs) {
         MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
                 MqttJob.MqttJobConfig.class);
@@ -341,6 +373,12 @@ public class JobProfileDto {
                 job.setSource(KAFKA_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case ORACLE:
+                OracleJob oracleJob = getOracleJob(dataConfig);
+                job.setOracleJob(oracleJob);
+                job.setSource(ORACLE_SOURCE);
+                profileDto.setJob(job);
+                break;
             case SQLSERVER:
                 SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
                 job.setSqlserverJob(sqlserverJob);
@@ -385,6 +423,7 @@ public class JobProfileDto {
         private FileJob fileJob;
         private BinlogJob binlogJob;
         private KafkaJob kafkaJob;
+        private OracleJob oracleJob;
         private MongoJob mongoJob;
         private MqttJob mqttJob;
         private SqlServerJob sqlserverJob;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java
new file mode 100644
index 000000000..ef2420b3a
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class OracleJob {
+
+    private String hostname;
+    private String user;
+    private String password;
+    private String port;
+    private String serverName;
+    private String dbname;
+
+    private OracleJob.Snapshot snapshot;
+    private OracleJob.Offset offset;
+    private OracleJob.History history;
+
+    @Data
+    public static class Offset {
+
+        private String intervalMs;
+        private String filename;
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+    @Data
+    public static class Snapshot {
+
+        private String mode;
+    }
+
+    @Data
+    public static class History {
+
+        private String filename;
+    }
+
+    @Data
+    public static class OracleJobConfig {
+
+        private String hostname;
+        private String user;
+        private String password;
+        private String port;
+        private String dbname;
+        private String serverName;
+
+        private String snapshotMode;
+        private String intervalMs;
+        private String offsetFilename;
+        private String historyFilename;
+
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 7bc8eb0f6..9f0bba001 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -85,6 +85,11 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-oracle</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-sqlserver</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
new file mode 100644
index 000000000..b78f18dac
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Oracle SQL source
+ */
+public class OracleSource extends AbstractSource {
+
+    private static final Logger logger = LoggerFactory.getLogger(OracleSource.class);
+
+    public OracleSource() {
+    }
+
+    @Override
+    public List<Reader> split(JobProfile conf) {
+        super.init(conf);
+        Reader oracleReader = new OracleReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(oracleReader);
+        sourceMetric.sourceSuccessCount.incrementAndGet();
+        return readerList;
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 5736f21b9..115070dc4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -21,8 +21,14 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.common.metric.MetricRegister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -49,6 +55,8 @@ public abstract class AbstractReader implements Reader {
     protected String metricName;
     protected Map<String, String> dimensions;
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractReader.class);
+
     @Override
     public void init(JobProfile jobConf) {
         inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
@@ -68,4 +76,31 @@ public abstract class AbstractReader implements Reader {
     public String getInlongGroupId() {
         return inlongGroupId;
     }
+
+    /**
+     * specific offsets
+     *
+     * @param server specific server
+     * @param file specific offset file
+     * @param pos specific offset pos
+     * @return
+     */
+    public String serializeOffset(final String server, final String file,
+            final String pos) {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        sourceOffset.put("file", file);
+        sourceOffset.put("pos", pos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", server);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
+        try {
+            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
+        }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
+    }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 38877951b..fa61c0a10 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -35,14 +35,11 @@ import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -215,8 +212,13 @@ public class BinlogReader extends AbstractReader {
         props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         props.setProperty("database.history.file.filename", databaseStoreHistoryName);
         if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
             props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
@@ -235,28 +237,6 @@ public class BinlogReader extends AbstractReader {
         return props;
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     @Override
     public void destroy() {
         synchronized (this) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index 3abe037bb..eee9e4bea 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -35,15 +35,12 @@ import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.agent.utils.GsonUtil;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
@@ -305,8 +302,13 @@ public class MongoDBReader extends AbstractReader {
 
         String snapshotMode = props.getOrDefault(JOB_MONGO_SNAPSHOT_MODE, "").toString();
         if (Objects.equals(SnapshotModeConstants.INITIAL, snapshotMode)) {
+            Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
         }
@@ -325,28 +327,6 @@ public class MongoDBReader extends AbstractReader {
         builder.with(field, value);
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     /**
      * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)}
      * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished.
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
similarity index 72%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
index 079d9af07..bfc80542a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java
@@ -1,326 +1,308 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
-import io.debezium.connector.sqlserver.SqlServerConnector;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import io.debezium.relational.history.FileDatabaseHistory;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.SnapshotModeConstants;
-import org.apache.inlong.agent.constant.SqlServerConstants;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
-import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
-import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
-import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
-import org.apache.inlong.agent.utils.GsonUtil;
-import org.apache.kafka.connect.storage.FileOffsetBackingStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-
-/**
- * Read data from SQLServer database by Debezium
- */
-public class SQLServerReader extends AbstractReader {
-
-    public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
-    public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
-    public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
-    public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
-    public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
-    public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
-    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode";
-    public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize";
-    public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets";
-    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile";
-    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos";
-
-    public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName";
-
-    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
-    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
-    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
-
-    private static final Gson GSON = new Gson();
-
-    private String databaseStoreHistoryName;
-    private String instanceId;
-    private String dbName;
-    private String serverName;
-    private String userName;
-    private String password;
-    private String hostName;
-    private String port;
-    private String offsetFlushIntervalMs;
-    private String offsetStoreFileName;
-    private String snapshotMode;
-    private String offset;
-    private String specificOffsetFile;
-    private String specificOffsetPos;
-
-    private ExecutorService executor;
-    private SqlServerSnapshotBase sqlServerSnapshot;
-    private boolean finished = false;
-
-    private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
-    private JobProfile jobProfile;
-    private boolean destroyed = false;
-
-    public SQLServerReader() {
-
-    }
-
-    @Override
-    public Message read() {
-        if (!sqlServerMessageQueue.isEmpty()) {
-            return getSqlServerMessage();
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * poll message from buffer pool
-     *
-     * @return org.apache.inlong.agent.plugin.Message
-     */
-    private DefaultMessage getSqlServerMessage() {
-        // Retrieves and removes the head of this queue,
-        // or returns null if this queue is empty.
-        Pair<String, String> message = sqlServerMessageQueue.poll();
-        if (Objects.isNull(message)) {
-            return null;
-        }
-        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
-        header.put(PROXY_KEY_DATA, message.getKey());
-        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
-    }
-
-    public boolean isDestroyed() {
-        return destroyed;
-    }
-
-    @Override
-    public boolean isFinished() {
-        return finished;
-    }
-
-    @Override
-    public String getReadSource() {
-        return instanceId;
-    }
-
-    @Override
-    public void setReadTimeout(long mill) {
-
-    }
-
-    @Override
-    public void setWaitMillisecond(long millis) {
-
-    }
-
-    @Override
-    public String getSnapshot() {
-        if (sqlServerSnapshot != null) {
-            return sqlServerSnapshot.getSnapshot();
-        } else {
-            return StringUtils.EMPTY;
-        }
-    }
-
-    @Override
-    public void finishRead() {
-        this.finished = true;
-    }
-
-    @Override
-    public boolean isSourceExist() {
-        return true;
-    }
-
-    private String tryToInitAndGetHistoryPath() {
-        String historyPath = agentConf.get(
-                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
-        String parentPath = agentConf.get(
-                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
-        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
-    }
-
-    @Override
-    public void init(JobProfile jobConf) {
-        super.init(jobConf);
-        jobProfile = jobConf;
-        LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr());
-        userName = jobConf.get(JOB_DATABASE_USER);
-        password = jobConf.get(JOB_DATABASE_PASSWORD);
-        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
-        port = jobConf.get(JOB_DATABASE_PORT);
-        dbName = jobConf.get(JOB_DATABASE_DBNAME);
-        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
-        instanceId = jobConf.getInstanceId();
-        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
-        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
-                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
-        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL);
-        sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
-        finished = false;
-
-        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
-                tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
-        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
-        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
-        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
-
-        sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName);
-        sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile());
-
-        Properties props = getEngineProps();
-
-        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
-                        io.debezium.engine.format.Json.class)
-                .using(props)
-                .notifying((records, committer) -> {
-                    try {
-                        for (ChangeEvent<String, String> record : records) {
-                            DebeziumFormat debeziumFormat = GSON
-                                    .fromJson(record.value(), DebeziumFormat.class);
-                            sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
-                            committer.markProcessed(record);
-                        }
-                        committer.markBatchFinished();
-                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
-                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                                System.currentTimeMillis(), records.size(), dataSize);
-                        readerMetric.pluginReadSuccessCount.addAndGet(records.size());
-                        readerMetric.pluginReadCount.addAndGet(records.size());
-                    } catch (Exception e) {
-                        readerMetric.pluginReadFailCount.addAndGet(records.size());
-                        readerMetric.pluginReadCount.addAndGet(records.size());
-                        LOGGER.error("parse SqlServer message error", e);
-                    }
-                })
-                .using((success, message, error) -> {
-                    if (!success) {
-                        LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error);
-                    }
-                }).build();
-
-        executor = Executors.newSingleThreadExecutor();
-        executor.execute(engine);
-
-        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
-    }
-
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
-    private Properties getEngineProps() {
-        Properties props = new Properties();
-        props.setProperty("name", "engine" + instanceId);
-        props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
-        props.setProperty("database.hostname", hostName);
-        props.setProperty("database.port", port);
-        props.setProperty("database.user", userName);
-        props.setProperty("database.password", password);
-        props.setProperty("database.dbname", dbName);
-        props.setProperty("database.server.name", serverName);
-        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
-        props.setProperty("database.snapshot.mode", snapshotMode);
-        props.setProperty("key.converter.schemas.enable", "false");
-        props.setProperty("value.converter.schemas.enable", "false");
-        props.setProperty("snapshot.mode", snapshotMode);
-        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
-        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
-        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
-            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
-            props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
-        } else {
-            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
-        }
-        props.setProperty("tombstones.on.delete", "false");
-        props.setProperty("converters", "datetime");
-        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
-        props.setProperty("datetime.format.date", "yyyy-MM-dd");
-        props.setProperty("datetime.format.time", "HH:mm:ss");
-        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
-        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
-
-        LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props);
-        return props;
-    }
-
-    @Override
-    public void destroy() {
-        synchronized (this) {
-            if (!destroyed) {
-                this.executor.shutdownNow();
-                this.sqlServerSnapshot.close();
-                this.destroyed = true;
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.oracle.OracleConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.OracleConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+
+/**
+ * Read data from Oracle database by Debezium
+ */
+public class OracleReader extends AbstractReader {
+
+    public static final String ORACLE_READER_TAG_NAME = "AgentOracleMetric";
+    public static final String JOB_DATABASE_USER = "job.oracleJob.user";
+    public static final String JOB_DATABASE_PASSWORD = "job.oracleJob.password";
+    public static final String JOB_DATABASE_HOSTNAME = "job.oracleJob.hostname";
+    public static final String JOB_DATABASE_PORT = "job.oracleJob.port";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.oracleJob.snapshot.mode";
+    public static final String JOB_DATABASE_SERVER_NAME = "job.oracleJob.serverName";
+    public static final String JOB_DATABASE_QUEUE_SIZE = "job.oracleJob.queueSize";
+    public static final String JOB_DATABASE_OFFSETS = "job.oracleJob.offsets";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.oracleJob.offset.specificOffsetFile";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.oracleJob.offset.specificOffsetPos";
+    public static final String JOB_DATABASE_DBNAME = "job.oracleJob.dbname";
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.oracleJob.offset.intervalMs";
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.oracleJob.history.filename";
+
+    private static final Gson GSON = new Gson();
+    public static final String ORACLE = "oracle";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+
+    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+
+    private String databaseStoreHistoryName;
+    private String instanceId;
+    private String dbName;
+    private String serverName;
+    private String userName;
+    private String password;
+    private String hostName;
+    private String port;
+    private String offsetFlushIntervalMs;
+    private String offsetStoreFileName;
+    private String snapshotMode;
+    private String offset;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
+    private OracleSnapshotBase oracleSnapshot;
+    private boolean finished = false;
+    private ExecutorService executor;
+
+    /**
+     * pair.left : table name
+     * pair.right : actual data
+     */
+    private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue;
+    private JobProfile jobProfile;
+    private boolean destroyed = false;
+
+    public OracleReader() {
+    }
+
+    @Override
+    public Message read() {
+        if (!oracleMessageQueue.isEmpty()) {
+            return getOracleMessage();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * poll message from buffer pool
+     *
+     * @return org.apache.inlong.agent.plugin.Message
+     */
+    private DefaultMessage getOracleMessage() {
+        // Retrieves and removes the head of this queue,
+        // or returns null if this queue is empty.
+        Pair<String, String> message = oracleMessageQueue.poll();
+        if (Objects.isNull(message)) {
+            return null;
+        }
+        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+        header.put(PROXY_KEY_DATA, message.getKey());
+        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
+    }
+
+    public boolean isDestroyed() {
+        return destroyed;
+    }
+
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public String getReadSource() {
+        return instanceId;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+
+    }
+
+    @Override
+    public void setWaitMillisecond(long millis) {
+
+    }
+
+    @Override
+    public String getSnapshot() {
+        if (oracleSnapshot != null) {
+            return oracleSnapshot.getSnapshot();
+        } else {
+            return StringUtils.EMPTY;
+        }
+    }
+
+    @Override
+    public void finishRead() {
+        this.finished = true;
+    }
+
+    @Override
+    public boolean isSourceExist() {
+        return true;
+    }
+
+    private String tryToInitAndGetHistoryPath() {
+        String historyPath = agentConf.get(
+                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+        String parentPath = agentConf.get(
+                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+    }
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        jobProfile = jobConf;
+        LOGGER.info("init oracle reader with jobConf {}", jobConf.toJsonStr());
+        userName = jobConf.get(JOB_DATABASE_USER);
+        password = jobConf.get(JOB_DATABASE_PASSWORD);
+        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+        port = jobConf.get(JOB_DATABASE_PORT);
+        dbName = jobConf.get(JOB_DATABASE_DBNAME);
+        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+        instanceId = jobConf.getInstanceId();
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, OracleConstants.INITIAL);
+        oracleMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
+        finished = false;
+
+        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
+        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+        oracleSnapshot = new OracleSnapshotBase(offsetStoreFileName);
+        oracleSnapshot.save(offset, oracleSnapshot.getFile());
+
+        Properties props = getEngineProps();
+
+        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+                        io.debezium.engine.format.Json.class)
+                .using(props)
+                .notifying((records, committer) -> {
+                    try {
+                        for (ChangeEvent<String, String> record : records) {
+                            DebeziumFormat debeziumFormat = GSON
+                                    .fromJson(record.value(), DebeziumFormat.class);
+                            oracleMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+                            committer.markProcessed(record);
+                        }
+                        committer.markBatchFinished();
+                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), records.size(), dataSize);
+                        readerMetric.pluginReadSuccessCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                    } catch (Exception e) {
+                        readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                        LOGGER.error("parse binlog message error", e);
+                    }
+                })
+                .using((success, message, error) -> {
+                    if (!success) {
+                        LOGGER.error("oracle job with jobConf {} has error {}", instanceId, message, error);
+                    }
+                }).build();
+
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(engine);
+
+        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+    }
+
+    private Properties getEngineProps() {
+        Properties props = new Properties();
+        props.setProperty("name", "engine" + instanceId);
+        props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
+        props.setProperty("database.hostname", hostName);
+        props.setProperty("database.port", port);
+        props.setProperty("database.user", userName);
+        props.setProperty("database.password", password);
+        props.setProperty("database.dbname", dbName);
+        props.setProperty("database.server.name", serverName);
+        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+        props.setProperty("database.snapshot.mode", snapshotMode);
+        props.setProperty("key.converter.schemas.enable", "false");
+        props.setProperty("value.converter.schemas.enable", "false");
+        props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
+            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
+            props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
+        } else {
+            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
+        }
+        props.setProperty("tombstones.on.delete", "false");
+        props.setProperty("converters", "datetime");
+        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+        props.setProperty("datetime.format.date", "yyyy-MM-dd");
+        props.setProperty("datetime.format.time", "HH:mm:ss");
+        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+        LOGGER.info("oracle job {} start with props {}", jobProfile.getInstanceId(), props);
+        return props;
+    }
+
+    @Override
+    public void destroy() {
+        synchronized (this) {
+            if (!destroyed) {
+                this.executor.shutdownNow();
+                this.oracleSnapshot.close();
+                this.destroyed = true;
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index 23bfa1fbe..38f02e277 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -34,14 +34,11 @@ import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -212,8 +209,13 @@ public class PostgreSQLReader extends AbstractReader {
         props.setProperty("snapshot.mode", snapshotMode);
         props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
         }
@@ -229,28 +231,6 @@ public class PostgreSQLReader extends AbstractReader {
         return props;
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     @Override
     public void destroy() {
         synchronized (this) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 079d9af07..3a6eeafcc 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -37,15 +37,12 @@ import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
-import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.agent.utils.GsonUtil;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -80,7 +77,7 @@ public class SQLServerReader extends AbstractReader {
     public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
     public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerReader.class);
     private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
 
     private static final Gson GSON = new Gson();
@@ -254,28 +251,6 @@ public class SQLServerReader extends AbstractReader {
         LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
     }
 
-    private String serializeOffset() {
-        Map<String, Object> sourceOffset = new HashMap<>();
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
-        sourceOffset.put("file", specificOffsetFile);
-        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
-                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
-        sourceOffset.put("pos", specificOffsetPos);
-        DebeziumOffset specificOffset = new DebeziumOffset();
-        specificOffset.setSourceOffset(sourceOffset);
-        Map<String, String> sourcePartition = new HashMap<>();
-        sourcePartition.put("server", instanceId);
-        specificOffset.setSourcePartition(sourcePartition);
-        byte[] serializedOffset = new byte[0];
-        try {
-            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
-        } catch (IOException e) {
-            LOGGER.error("serialize offset message error", e);
-        }
-        return new String(serializedOffset, StandardCharsets.UTF_8);
-    }
-
     private Properties getEngineProps() {
         Properties props = new Properties();
         props.setProperty("name", "engine" + instanceId);
@@ -294,8 +269,13 @@ public class SQLServerReader extends AbstractReader {
         props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         props.setProperty("database.history.file.filename", databaseStoreHistoryName);
         if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                    JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
             props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
-            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
+                    serializeOffset(instanceId, specificOffsetFile, specificOffsetPos));
             props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
         } else {
             props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java
new file mode 100644
index 000000000..4c19c3eea
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * Oracle Snapshot
+ */
+public class OracleSnapshotBase extends AbstractSnapshot {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotBase.class);
+    private final File file;
+
+    public OracleSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        byte[] offset = this.load(this.file);
+        return ENCODER.encodeToString(offset);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
new file mode 100644
index 000000000..f577ed5db
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
+/**
+ * Test cases for {@link OracleReader}.
+ */
+public class TestOracleConnect {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestOracleConnect.class);
+
+    @Ignore
+    public void testOracle() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set("job.oracleJob.hostname", "localhost");
+        jobProfile.set("job.oracleJob.port", "1521");
+        jobProfile.set("job.oracleJob.user", "c##dbzuser");
+        jobProfile.set("job.oracleJob.password", "dbz");
+        jobProfile.set("job.oracleJob.sid", "ORCLCDB");
+        jobProfile.set("job.oracleJob.dbname", "ORCLCDB");
+        jobProfile.set("job.oracleJob.serverName", "server1");
+        jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+        OracleReader oracleReader = new OracleReader();
+        oracleReader.init(jobProfile);
+        while (true) {
+            Message message = oracleReader.read();
+            if (message != null) {
+                LOGGER.info("event content: {}", message);
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java
new file mode 100644
index 000000000..db24e3acc
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.OracleConstants;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.field;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Test cases for {@link OracleReader}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, OracleReader.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestOracleReader {
+
+    private OracleReader reader;
+
+    @Mock
+    private JobProfile jobProfile;
+
+    @Mock
+    private AgentMetricItemSet agentMetricItemSet;
+
+    @Mock
+    private AgentMetricItem agentMetricItem;
+
+    @Mock
+    private OracleSnapshotBase oracleSnapshot;
+
+    @Mock
+    private DebeziumEngine.Builder builder;
+
+    @Mock
+    private ExecutorService executorService;
+
+    @Mock
+    private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue;
+
+    @Mock
+    private DebeziumEngine<ChangeEvent<String, String>> engine;
+
+    private AtomicLong atomicLong;
+
+    private AtomicLong atomicCountLong;
+
+    private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
+
+    @Before
+    public void setUp() throws Exception {
+        final String username = "sa";
+        final String password = "123456";
+        final String hostname = "127.0.0.1";
+        final String port = "1434";
+        final String groupId = "group01";
+        final String streamId = "stream01";
+        final String dbName = "testdb";
+        final String serverName = "serverName";
+        final String offsetFlushIntervalMs = "1000";
+        final String offsetStoreFileName = "/opt/offset.dat";
+        final String snapshotMode = OracleConstants.INITIAL;
+        final int queueSize = 1000;
+        final String databaseStoreHistoryName = "/opt/history.dat";
+        final String offset = "111";
+        final String specificOffsetFile = "";
+        final String specificOffsetPos = "-1";
+
+        atomicLong = new AtomicLong(0L);
+        atomicCountLong = new AtomicLong(0L);
+
+        when(jobProfile.getInstanceId()).thenReturn(instanceId);
+        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId);
+        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_USER))).thenReturn(username);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PASSWORD))).thenReturn(password);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PORT))).thenReturn(port);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_DBNAME))).thenReturn(dbName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn(
+                offsetFlushIntervalMs);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn(
+                offsetStoreFileName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode);
+        when(jobProfile.getInt(eq(OracleReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(databaseStoreHistoryName);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn(
+                specificOffsetFile);
+        when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn(
+                specificOffsetPos);
+        whenNew(OracleSnapshotBase.class).withAnyArguments().thenReturn(oracleSnapshot);
+
+        //mock oracleMessageQueue
+        whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(oracleMessageQueue);
+
+        //mock DebeziumEngine
+        mockStatic(DebeziumEngine.class);
+        when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+        when(builder.using(any(Properties.class))).thenReturn(builder);
+        when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+        when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+        when(builder.build()).thenReturn(engine);
+
+        //mock executorService
+        mockStatic(Executors.class);
+        when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
+
+        //mock metrics
+        whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
+        when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
+        field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem, atomicLong);
+        field(AgentMetricItem.class, "pluginReadSuccessCount").set(agentMetricItem, atomicCountLong);
+
+        //init method
+        mockStatic(MetricRegister.class);
+        (reader = new OracleReader()).init(jobProfile);
+    }
+
+    /**
+     * Test cases for {@link OracleReader#read()}.
+     */
+    @Test
+    public void testRead() throws Exception {
+        final String right = "value";
+        final String left = "key";
+        final String dataKey = "dataKey";
+        when(oracleMessageQueue.isEmpty()).thenReturn(true);
+        assertEquals(null, reader.read());
+        when(oracleMessageQueue.isEmpty()).thenReturn(false);
+        when(oracleMessageQueue.poll()).thenReturn(Pair.of(left, right));
+        Message result = reader.read();
+        assertEquals(String.join(right, "\"", "\""), result.toString());
+        assertEquals(left, result.getHeader().get(dataKey));
+    }
+
+    /**
+     * Test cases for {@link OracleReader#destroy()}.
+     */
+    @Test
+    public void testDestroy() throws Exception {
+        assertFalse(reader.isDestroyed());
+        reader.destroy();
+        verify(executorService).shutdownNow();
+        verify(oracleSnapshot).close();
+        assertTrue(reader.isDestroyed());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#finishRead()}.
+     */
+    @Test
+    public void testFinishRead() throws Exception {
+        assertFalse(reader.isFinished());
+        reader.finishRead();
+        assertTrue(reader.isFinished());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#isSourceExist()}.
+     */
+    @Test
+    public void testIsSourceExist() {
+        assertTrue(reader.isSourceExist());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#getSnapshot()}.
+     */
+    @Test
+    public void testGetSnapshot() {
+        final String snapShort = "snapShort";
+        when(oracleSnapshot.getSnapshot()).thenReturn(snapShort);
+        assertEquals(snapShort, reader.getSnapshot());
+    }
+
+    /**
+     * Test cases for {@link OracleReader#getReadSource()}.
+     */
+    @Test
+    public void testGetReadSource() {
+        assertEquals(instanceId, reader.getReadSource());
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java
new file mode 100644
index 000000000..160357e77
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.common.metric.MetricItem;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.powermock.api.support.membermodification.MemberMatcher.field;
+
+/**
+ * Test cases for {@link OracleSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({OracleSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestOracleSource {
+
+    @Mock
+    JobProfile jobProfile;
+
+    @Mock
+    private AgentMetricItemSet agentMetricItemSet;
+
+    @Mock
+    private AgentMetricItem agentMetricItem;
+
+    private AtomicLong sourceSuccessCount;
+
+    private AtomicLong sourceFailCount;
+
+    @Before
+    public void setup() throws Exception {
+        sourceSuccessCount = new AtomicLong(0);
+        sourceFailCount = new AtomicLong(0);
+
+        // mock metrics
+        whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
+        when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
+        field(AgentMetricItem.class, "sourceSuccessCount").set(agentMetricItem, sourceSuccessCount);
+        field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem, sourceFailCount);
+        PowerMockito.mockStatic(MetricRegister.class);
+        PowerMockito.doNothing().when(
+                MetricRegister.class, "register", any(MetricItem.class));
+    }
+
+    /**
+     * Test cases for {@link OracleSource#split(JobProfile)}.
+     */
+    @Test
+    public void testSplit() {
+
+        // build mock
+        final OracleSource source = new OracleSource();
+        // assert
+        assertEquals(1, source.split(jobProfile).size());
+    }
+}
diff --git a/pom.xml b/pom.xml
index 7b5c185f7..c7ae7ac70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -623,6 +623,13 @@
                 <artifactId>debezium-connector-postgres</artifactId>
                 <version>${debezium.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-oracle</artifactId>
+                <version>${debezium.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.h2database</groupId>
                 <artifactId>h2</artifactId>


[inlong] 02/07: [INLONG-6477][Manager] Add consume API in the manager client (#6480)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f7553910668072bf0b2719cf028709da2a00eabb
Author: haifxu <xh...@gmail.com>
AuthorDate: Wed Nov 9 17:37:41 2022 +0800

    [INLONG-6477][Manager] Add consume API in the manager client (#6480)
---
 .../inlong/manager/client/api/InlongConsume.java   |  84 +++++++++++
 .../manager/client/api/impl/InlongConsumeImpl.java |  89 ++++++++++++
 .../client/api/inner/client/ClientFactory.java     |   2 +
 .../api/inner/client/InlongConsumeClient.java      | 147 +++++++++++++++++++
 .../client/api/service/InlongConsumeApi.java       |  59 ++++++++
 .../client/api/inner/ClientFactoryTest.java        |   3 +
 .../client/api/inner/InlongConsumeClientTest.java  | 160 +++++++++++++++++++++
 .../web/controller/InlongConsumeController.java    |   5 +-
 8 files changed, 546 insertions(+), 3 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java
new file mode 100644
index 000000000..182d7bae2
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api;
+
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+
+public interface InlongConsume {
+
+    /**
+     * Save inlong consume info.
+     *
+     * @param request consume request need to save
+     * @return inlong consume id after saving
+     */
+    Integer save(InlongConsumeRequest request);
+
+    /**
+     * Get inlong consume info based on ID
+     *
+     * @param id inlong consume id
+     * @return detail of inlong group
+     */
+    InlongConsumeInfo get(Integer id);
+
+    /**
+     * Query the inlong consume statistics info via the username
+     *
+     * @return inlong consume status statistics
+     */
+    InlongConsumeCountInfo countStatusByUser();
+
+    /**
+     * Paging query inlong consume info list
+     *
+     * @param request pagination query request
+     * @return inlong consume list
+     */
+    PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request);
+
+    /**
+     * Update the inlong consume
+     *
+     * @param request inlong consume request that needs to be updated
+     * @return inlong consume id after saving
+     */
+    Integer update(InlongConsumeRequest request);
+
+    /**
+     * Delete the inlong consume by the id
+     *
+     * @param id inlong consume id that needs to be deleted
+     * @return whether succeed
+     */
+    Boolean delete(Integer id);
+
+    /**
+     * Start the process for the specified ID.
+     *
+     * @param id inlong consume id
+     * @return workflow result
+     */
+    WorkflowResult startProcess(Integer id);
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java
new file mode 100644
index 000000000..cc0139d56
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.InlongConsume;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+
+public class InlongConsumeImpl implements InlongConsume {
+
+    private final InlongConsumeClient consumeClient;
+
+    public InlongConsumeImpl(ClientConfiguration configuration) {
+        ClientFactory clientFactory = ClientUtils.getClientFactory(configuration);
+        this.consumeClient = clientFactory.getConsumeClient();
+    }
+
+    @Override
+    public Integer save(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+        Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null");
+        Preconditions.checkNotNull(request.getConsumerGroup(), "inlong consume topic cannot be null");
+
+        return consumeClient.save(request);
+    }
+
+    @Override
+    public InlongConsumeInfo get(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        return consumeClient.get(id);
+    }
+
+    @Override
+    public InlongConsumeCountInfo countStatusByUser() {
+        return consumeClient.countStatusByUser();
+    }
+
+    @Override
+    public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) {
+        return consumeClient.list(request);
+    }
+
+    @Override
+    public Integer update(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+
+        return consumeClient.update(request);
+    }
+
+    @Override
+    public Boolean delete(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        return consumeClient.delete(id);
+    }
+
+    @Override
+    public WorkflowResult startProcess(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        return consumeClient.startProcess(id);
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
index e96b52de9..77a63cb1a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
@@ -50,6 +50,7 @@ public class ClientFactory {
 
     private final WorkflowApproverClient workflowApproverClient;
     private final WorkflowEventClient workflowEventClient;
+    private final InlongConsumeClient consumeClient;
 
     public ClientFactory(ClientConfiguration configuration) {
         groupClient = new InlongGroupClient(configuration);
@@ -65,5 +66,6 @@ public class ClientFactory {
         heartbeatClient = new HeartbeatClient(configuration);
         workflowApproverClient = new WorkflowApproverClient(configuration);
         workflowEventClient = new WorkflowEventClient(configuration);
+        consumeClient = new InlongConsumeClient(configuration);
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
new file mode 100644
index 000000000..36ba6389b
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InlongConsumeApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+
+import java.util.Map;
+
+/**
+ * Client for {@link InlongConsumeApi}.
+ */
+public class InlongConsumeClient {
+
+    private final InlongConsumeApi inlongConsumeApi;
+
+    public InlongConsumeClient(ClientConfiguration configuration) {
+        inlongConsumeApi = ClientUtils.createRetrofit(configuration).create(InlongConsumeApi.class);
+    }
+
+    /**
+     * Save inlong consume info.
+     *
+     * @param request consume request need to save
+     * @return inlong consume id after saving
+     */
+    public Integer save(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+        Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null");
+        Preconditions.checkNotNull(request.getConsumerGroup(), "inlong consume topic cannot be null");
+
+        Response<Integer> response = ClientUtils.executeHttpCall(inlongConsumeApi.save(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Get inlong consume info based on ID
+     *
+     * @param id inlong consume id
+     * @return detail of inlong group
+     */
+    public InlongConsumeInfo get(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        Response<InlongConsumeInfo> response = ClientUtils.executeHttpCall(inlongConsumeApi.get(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Query the inlong consume statistics info via the username
+     *
+     * @return inlong consume status statistics
+     */
+    public InlongConsumeCountInfo countStatusByUser() {
+        Response<InlongConsumeCountInfo> response = ClientUtils.executeHttpCall(inlongConsumeApi.countStatusByUser());
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Paging query inlong consume info list
+     *
+     * @param request pagination query request
+     * @return inlong consume list
+     */
+    public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) {
+        Map<String, Object> requestMap = JsonUtils.OBJECT_MAPPER.convertValue(request,
+                new TypeReference<Map<String, Object>>() {
+                });
+
+        Response<PageResult<InlongConsumeBriefInfo>> response = ClientUtils.executeHttpCall(
+                inlongConsumeApi.list(requestMap));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Update the inlong consume
+     *
+     * @param request inlong consume request that needs to be updated
+     * @return inlong consume id after saving
+     */
+    public Integer update(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+
+        Response<Integer> response = ClientUtils.executeHttpCall(inlongConsumeApi.update(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Delete the inlong consume by the id
+     *
+     * @param id inlong consume id that needs to be deleted
+     * @return whether succeed
+     */
+    public Boolean delete(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongConsumeApi.delete(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Start the process for the specified ID.
+     *
+     * @param id inlong consume id
+     * @return workflow result
+     */
+    public WorkflowResult startProcess(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        Response<WorkflowResult> response = ClientUtils.executeHttpCall(inlongConsumeApi.startProcess(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
new file mode 100644
index 000000000..d78dd5a5a
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.service;
+
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.DELETE;
+import retrofit2.http.GET;
+import retrofit2.http.POST;
+import retrofit2.http.Path;
+import retrofit2.http.Query;
+
+import java.util.Map;
+
+public interface InlongConsumeApi {
+
+    @POST("consume/save")
+    Call<Response<Integer>> save(@Body InlongConsumeRequest request);
+
+    @GET("consume/get/{id}")
+    Call<Response<InlongConsumeInfo>> get(@Path("id") Integer id);
+
+    @GET("consume/countStatus")
+    Call<Response<InlongConsumeCountInfo>> countStatusByUser();
+
+    @GET("consume/list")
+    Call<Response<PageResult<InlongConsumeBriefInfo>>> list(@Query("request") Map<String, Object> request);
+
+    @POST("consume/update")
+    Call<Response<Integer>> update(@Body InlongConsumeRequest request);
+
+    @DELETE("consume/delete/{id}")
+    Call<Response<Boolean>> delete(@Path("id") Integer id);
+
+    @POST("consume/startProcess/{id}")
+    Call<Response<WorkflowResult>> startProcess(@Path("id") Integer id);
+}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index a5f29c30e..8918edb26 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.DataNodeClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
 import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
@@ -121,6 +122,7 @@ class ClientFactoryTest {
     private static DataNodeClient dataNodeClient;
     private static UserClient userClient;
     private static WorkflowClient workflowClient;
+    private static InlongConsumeClient consumeClient;
 
     @BeforeAll
     static void setup() {
@@ -143,6 +145,7 @@ class ClientFactoryTest {
         dataNodeClient = clientFactory.getDataNodeClient();
         userClient = clientFactory.getUserClient();
         workflowClient = clientFactory.getWorkflowClient();
+        consumeClient = clientFactory.getConsumeClient();
     }
 
     @AfterAll
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
new file mode 100644
index 000000000..359145181
--- /dev/null
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner;
+
+import com.google.common.collect.Lists;
+import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+
+/**
+ * Tests for {@link InlongConsumeClient}
+ */
+public class InlongConsumeClientTest extends ClientFactoryTest {
+
+    private final InlongConsumeClient consumeClient = clientFactory.getConsumeClient();
+
+    @Test
+    void testConsumeSave() {
+        stubFor(
+                post(urlMatching("/inlong/manager/api/consume/save.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(1)))
+                        )
+        );
+
+        InlongConsumeRequest request = new ConsumePulsarRequest();
+        request.setTopic("test_topic");
+        request.setMqType(MQType.PULSAR);
+        request.setConsumerGroup("test_consume_group");
+        Integer consumeId = consumeClient.save(request);
+        Assertions.assertEquals(1, consumeId);
+    }
+
+    @Test
+    void testConsumeGet() {
+        InlongConsumeInfo response = new ConsumePulsarInfo();
+        response.setMqType(MQType.PULSAR);
+        response.setId(1);
+
+        stubFor(
+                get(urlMatching("/inlong/manager/api/consume/get/1.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(response)))
+                        )
+        );
+
+        InlongConsumeInfo consumeInfo = consumeClient.get(1);
+        Assertions.assertEquals(1, consumeInfo.getId());
+        Assertions.assertTrue(consumeInfo instanceof ConsumePulsarInfo);
+    }
+
+    @Test
+    void testConsumeCountStatus() {
+        InlongConsumeCountInfo response = new InlongConsumeCountInfo();
+        response.setTotalCount(10);
+        response.setRejectCount(2);
+        response.setWaitApproveCount(5);
+        response.setWaitAssignCount(3);
+
+        stubFor(
+                get(urlMatching("/inlong/manager/api/consume/countStatus.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(response)))
+                        )
+        );
+
+        InlongConsumeCountInfo consumeCountInfo = consumeClient.countStatusByUser();
+        Assertions.assertEquals(10, consumeCountInfo.getTotalCount());
+    }
+
+    @Test
+    void testConsumeList() {
+        List<InlongConsumeBriefInfo> responses = Lists.newArrayList(
+                InlongConsumeBriefInfo.builder()
+                        .id(1)
+                        .mqType(MQType.PULSAR)
+                        .inlongGroupId("test_group_id")
+                        .consumerGroup("test_consume_group")
+                        .build()
+        );
+
+        stubFor(
+                get(urlMatching("/inlong/manager/api/consume/list.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(new PageResult<>(responses))))
+                        )
+        );
+
+        PageResult<InlongConsumeBriefInfo> briefInfoPageResult = consumeClient.list(new InlongConsumePageRequest());
+        Assertions.assertEquals(JsonUtils.toJsonString(responses),
+                JsonUtils.toJsonString(briefInfoPageResult.getList()));
+    }
+
+    @Test
+    void testConsumeUpdate() {
+        stubFor(
+                post(urlMatching("/inlong/manager/api/consume/update.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(1)))
+                        )
+        );
+
+        InlongConsumeRequest request = new ConsumePulsarRequest();
+        request.setId(1);
+        request.setMqType(MQType.PULSAR);
+        Integer consumeId = consumeClient.update(request);
+        Assertions.assertEquals(1, consumeId);
+    }
+
+    @Test
+    void testConsumeDelete() {
+        stubFor(
+                delete(urlMatching("/inlong/manager/api/consume/delete/1.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(true)))
+                        )
+        );
+
+        InlongConsumeRequest request = new ConsumePulsarRequest();
+        request.setId(1);
+        request.setMqType(MQType.PULSAR);
+        Boolean delete = consumeClient.delete(1);
+        Assertions.assertTrue(delete);
+    }
+}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
index 475ebda07..0dafa4765 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -99,9 +99,8 @@ public class InlongConsumeController {
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete inlong consume by ID")
     @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
-    public Response<Object> delete(@PathVariable(name = "id") Integer id) {
-        consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
-        return Response.success();
+    public Response<Boolean> delete(@PathVariable(name = "id") Integer id) {
+        return Response.success(consumeService.delete(id, LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping("/consume/startProcess/{id}")


[inlong] 04/07: [INLONG-6482][Dashboard] Sink management distinguishes between save-only and save-and-submit processes (#6483)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1d298a1b8a26c6e1f707bdb84bc83198edf92f79
Author: Daniel <le...@apache.org>
AuthorDate: Wed Nov 9 17:46:10 2022 +0800

    [INLONG-6482][Dashboard] Sink management distinguishes between save-only and save-and-submit processes (#6483)
---
 inlong-dashboard/src/locales/cn.json               |  3 +++
 inlong-dashboard/src/locales/en.json               |  3 +++
 .../src/metas/groups/defaults/index.ts             |  2 +-
 .../pages/GroupDetail/DataStorage/DetailModal.tsx  | 24 +++++++++++++++++++---
 .../src/pages/GroupDetail/DataStorage/index.tsx    |  1 +
 5 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index e99371b9c..44ad7e605 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -384,6 +384,9 @@
   "pages.GroupDetail.Sink.Status.Pending": "配置中",
   "pages.GroupDetail.Sink.Status.Error": "配置失败",
   "pages.GroupDetail.Sink.Status.Success": "配置成功",
+  "pages.GroupDetail.Sink.Cancel": "取消",
+  "pages.GroupDetail.Sink.Save": "仅保存",
+  "pages.GroupDetail.Sink.SaveAndRun": "保存并提交流程",
   "pages.GroupDetail.Stream.StreamConfigTitle": "数据流配置",
   "pages.GroupDetail.Stream.CreateDataStream": "新建数据流",
   "pages.GroupDetail.PageTitle": "详情",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 255f56996..ecdfcfa24 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -384,6 +384,9 @@
   "pages.GroupDetail.Sink.Status.Pending": "Pending",
   "pages.GroupDetail.Sink.Status.Error": "Error",
   "pages.GroupDetail.Sink.Status.Success": "Success",
+  "pages.GroupDetail.Sink.Cancel": "Cancel",
+  "pages.GroupDetail.Sink.Save": "Save",
+  "pages.GroupDetail.Sink.SaveAndRun": "Save and submit the process",
   "pages.GroupDetail.Stream.StreamConfigTitle": "Data stream configuration",
   "pages.GroupDetail.Stream.CreateDataStream": "Create",
   "pages.GroupDetail.PageTitle": "Detail",
diff --git a/inlong-dashboard/src/metas/groups/defaults/index.ts b/inlong-dashboard/src/metas/groups/defaults/index.ts
index e68379f4a..1eacc5e5b 100644
--- a/inlong-dashboard/src/metas/groups/defaults/index.ts
+++ b/inlong-dashboard/src/metas/groups/defaults/index.ts
@@ -37,7 +37,7 @@ export const allDefaultGroups: MetaExportWithBackendList<GroupMetaType> = [
     LoadEntity: () => import('./Pulsar'),
   },
   {
-    label: 'TubeMq',
+    label: 'TubeMQ',
     value: 'TUBEMQ',
     LoadEntity: () => import('./TubeMq'),
   },
diff --git a/inlong-dashboard/src/pages/GroupDetail/DataStorage/DetailModal.tsx b/inlong-dashboard/src/pages/GroupDetail/DataStorage/DetailModal.tsx
index bba88d61b..0e5489545 100644
--- a/inlong-dashboard/src/pages/GroupDetail/DataStorage/DetailModal.tsx
+++ b/inlong-dashboard/src/pages/GroupDetail/DataStorage/DetailModal.tsx
@@ -18,7 +18,7 @@
  */
 
 import React, { useMemo, useState } from 'react';
-import { Skeleton, Modal, message } from 'antd';
+import { Button, Skeleton, Modal, message } from 'antd';
 import { ModalProps } from 'antd/es/modal';
 import { useRequest, useUpdateEffect } from '@/hooks';
 import { useTranslation } from 'react-i18next';
@@ -84,7 +84,7 @@ const Comp: React.FC<DetailModalProps> = ({ inlongGroupId, id, ...modalProps })
     return Entity ? new Entity().renderRow() : [];
   }, [Entity]);
 
-  const onOk = async () => {
+  const onOk = async (startProcess = false) => {
     const values = await form.validateFields();
     const submitData = new Entity()?.stringify(values) || values;
     const isUpdate = Boolean(id);
@@ -92,6 +92,9 @@ const Comp: React.FC<DetailModalProps> = ({ inlongGroupId, id, ...modalProps })
       submitData.id = id;
       submitData.version = data?.version;
     }
+    if (startProcess) {
+      submitData.startProcess = true;
+    }
     await request({
       url: isUpdate ? '/sink/update' : '/sink/save',
       method: 'POST',
@@ -105,7 +108,22 @@ const Comp: React.FC<DetailModalProps> = ({ inlongGroupId, id, ...modalProps })
   };
 
   return (
-    <Modal title="Sink" width={1200} {...modalProps} onOk={onOk}>
+    <Modal
+      title="Sink"
+      width={1200}
+      {...modalProps}
+      footer={[
+        <Button key="cancel" onClick={modalProps.onCancel}>
+          {t('pages.GroupDetail.Sink.Cancel')}
+        </Button>,
+        <Button key="save" type="primary" onClick={() => onOk(false)}>
+          {t('pages.GroupDetail.Sink.Save')}
+        </Button>,
+        <Button key="run" type="primary" onClick={() => onOk(true)}>
+          {t('pages.GroupDetail.Sink.SaveAndRun')}
+        </Button>,
+      ]}
+    >
       {loading ? (
         <Skeleton active />
       ) : (
diff --git a/inlong-dashboard/src/pages/GroupDetail/DataStorage/index.tsx b/inlong-dashboard/src/pages/GroupDetail/DataStorage/index.tsx
index cdada63ba..3c836da23 100644
--- a/inlong-dashboard/src/pages/GroupDetail/DataStorage/index.tsx
+++ b/inlong-dashboard/src/pages/GroupDetail/DataStorage/index.tsx
@@ -76,6 +76,7 @@ const Comp = ({ inlongGroupId, readonly }: Props, ref) => {
             method: 'DELETE',
             params: {
               sinkType: options.sinkType,
+              startProcess: true,
             },
           });
           await getList();