You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 04:05:19 UTC

[inlong] branch branch-1.4 updated (da4ef8691 -> 144e9c0c0)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from da4ef8691 [INLONG-6434][Release] Bumped version to 1.4.0 (#6453)
     new 7b7fc132f [INLONG-6423][TubeMQ] Consumer registration failed due to BDB error (#6450)
     new cbe2af998 [INLONG-6418][Dashboard] Support management of MongoDB source (#6454)
     new 448dada0a [INLONG-6440][Manager] Fix heartbeat information error for IP (#6444)
     new 1cf83e216 [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
     new 144e9c0c0 [INLONG-6459][CI] Match the branch-version as release branch (#6460)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci_docker.yml                    |   6 +-
 ...reSQLConstants.java => SqlServerConstants.java} |  25 +-
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  39 +++
 .../pojo/{BinlogJob.java => SqlServerJob.java}     |  53 ++--
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../agent/plugin/sources/SQLServerSource.java      |  34 +-
 .../plugin/sources/reader/SQLServerReader.java     | 344 ++++++++++++++-------
 ...napshotBase.java => SqlServerSnapshotBase.java} |   8 +-
 .../agent/plugin/sources/TestSQLServerConnect.java |  48 +--
 .../agent/plugin/sources/TestSQLServerReader.java  | 156 +++++-----
 .../agent/plugin/sources/TestSQLServerSource.java  |  13 -
 inlong-dashboard/src/locales/cn.json               |   6 +
 inlong-dashboard/src/locales/en.json               |   6 +
 .../metas/sources/defaults/{File.ts => Mongodb.ts} |  58 ++--
 .../src/metas/sources/defaults/index.ts            |   5 +
 .../service/heartbeat/HeartbeatManager.java        |   2 +
 .../server/broker/offset/OffsetRecordService.java  |   2 +-
 .../inlong/tubemq/server/master/TMaster.java       |   6 +-
 pom.xml                                            |   6 +
 19 files changed, 497 insertions(+), 325 deletions(-)
 copy inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/{PostgreSQLConstants.java => SqlServerConstants.java} (60%)
 copy inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/{BinlogJob.java => SqlServerJob.java} (62%)
 copy inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/{PostgreSQLSnapshotBase.java => SqlServerSnapshotBase.java} (88%)
 copy inlong-dashboard/src/metas/sources/defaults/{File.ts => Mongodb.ts} (62%)


[inlong] 01/05: [INLONG-6423][TubeMQ] Consumer registration failed due to BDB error (#6450)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7b7fc132f83e881509adba16e5e6fc0f893ef683
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Nov 8 11:00:43 2022 +0800

    [INLONG-6423][TubeMQ] Consumer registration failed due to BDB error (#6450)
---
 .../inlong/tubemq/server/broker/offset/OffsetRecordService.java     | 2 +-
 .../main/java/org/apache/inlong/tubemq/server/master/TMaster.java   | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 2f55d4f14..16901911f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -79,7 +79,7 @@ public class OffsetRecordService extends AbstractDaemonService {
         // check topic writable status
         TopicMetadata topicMetadata = storeManager.getMetadataManager()
                 .getTopicMetadata(TServerConstants.OFFSET_HISTORY_NAME);
-        if (!topicMetadata.isAcceptPublish()) {
+        if (topicMetadata == null || !topicMetadata.isAcceptPublish()) {
             return;
         }
         // get group offset information
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index e581a5105..6a5b76b90 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -575,6 +575,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return builder.build();
         }
         final String groupName = (String) paramCheckResult.checkData;
+        checkNodeStatus(consumerId, strBuffer);
         if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
                 request.getTopicListList(), result, strBuffer)) {
             builder.setErrCode(result.getErrCode());
@@ -626,7 +627,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return builder.build();
         }
         ConsumerInfo inConsumerInfo2 = (ConsumerInfo) paramCheckResult.checkData;
-        checkNodeStatus(consumerId, strBuffer);
         CertifiedResult authorizeResult =
                 serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
                         groupName, reqTopicSet, reqTopicConditions, rmtAddress);
@@ -1258,6 +1258,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return builder.build();
         }
         final String groupName = (String) paramCheckResult.checkData;
+        // check master current status
+        checkNodeStatus(consumerId, sBuffer);
         if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
                 request.getTopicListList(), result, sBuffer)) {
             builder.setErrCode(result.getErrCode());
@@ -1283,8 +1285,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         if (request.hasOpsTaskInfo()) {
             opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
         }
-        // check master current status
-        checkNodeStatus(consumerId, sBuffer);
         ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
         if (request.hasSubRepInfo()) {
             clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());


[inlong] 05/05: [INLONG-6459][CI] Match the branch-version as release branch (#6460)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 144e9c0c0ab02e464aad088de1001ed6fdb4d706
Author: Charles Zhang <do...@apache.org>
AuthorDate: Tue Nov 8 11:59:45 2022 +0800

    [INLONG-6459][CI] Match the branch-version as release branch (#6460)
---
 .github/workflows/ci_docker.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/ci_docker.yml b/.github/workflows/ci_docker.yml
index 2e6ab4bbd..98af35529 100644
--- a/.github/workflows/ci_docker.yml
+++ b/.github/workflows/ci_docker.yml
@@ -95,11 +95,11 @@ jobs:
         run: |
           if [[ ${{ github.ref_name }} == ${{ github.event.repository.default_branch }} ]]; then
             echo "::set-output name=match_master::true"
-          elif [[ ${{ github.ref_name }} =~ ^release-[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
+          elif [[ ${{ github.ref_name }} =~ ^branch-[0-9]+\.[0-9]+$ ]]; then
             echo "::set-output name=match_release::true"
           fi
 
-      # Publish x86 Docker images when the changes are being pushed to the master branch or a release branch like 'release-1.0.0'.
+      # Publish x86 Docker images when the changes are being pushed to the master branch or a release branch like 'branch-1.4'.
       - name: Push x86 Docker images to Docker Hub
         if: |
           steps.match.outputs.match_master == 'true'
@@ -112,7 +112,7 @@ jobs:
           DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
           DOCKER_ORG: inlong
 
-      # Publish aarch64 Docker images when the changes are being pushed to a release branch like 'release-1.0.0'.
+      # Publish aarch64 Docker images when the changes are being pushed to a release branch like 'branch-1.4'.
       - name: Push aarch64 Docker images to Docker Hub
         if: ${{ steps.match.outputs.match_release == 'true' }}
         working-directory: docker


[inlong] 04/05: [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1cf83e2167fbd2ebbde2477b32a538b48ab90c54
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Nov 8 11:54:57 2022 +0800

    [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
---
 .../inlong/agent/constant/SqlServerConstants.java  |  39 +++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  39 +++
 .../org/apache/inlong/agent/pojo/SqlServerJob.java |  76 +++++
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../agent/plugin/sources/SQLServerSource.java      |  34 +-
 .../plugin/sources/reader/SQLServerReader.java     | 344 ++++++++++++++-------
 .../sources/snapshot/SqlServerSnapshotBase.java    |  52 ++++
 .../agent/plugin/sources/TestSQLServerConnect.java |  48 +--
 .../agent/plugin/sources/TestSQLServerReader.java  | 156 +++++-----
 .../agent/plugin/sources/TestSQLServerSource.java  |  13 -
 pom.xml                                            |   6 +
 11 files changed, 556 insertions(+), 256 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java
new file mode 100644
index 000000000..ffb4d7ae8
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class SqlServerConstants {
+
+    /**
+     * Takes a snapshot of structure and data of captured tables;
+     * useful if topics should be populated with a complete representation of the data from the captured tables.
+     */
+    public static final String INITIAL = "initial";
+
+    /**
+     * Takes a snapshot of structure and data like initial
+     * but instead does not transition into streaming changes once the snapshot has completed.
+     */
+    public static final String INITIAL_ONLY = "initial_only";
+
+    /**
+     * Takes a snapshot of the structure of captured tables only;
+     * useful if only changes happening from now onwards should be propagated to topics.
+     */
+    public static final String SCHEMA_ONLY = "schema_only";
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index d57b2c341..810a42d2b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -58,6 +58,10 @@ public class JobProfileDto {
      * mqtt source
      */
     public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource";
+    /**
+     * sqlserver source
+     */
+    public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SqlServerSource";
 
     private static final Gson GSON = new Gson();
 
@@ -226,6 +230,34 @@ public class JobProfileDto {
         return mongoJob;
     }
 
+    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+        SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                SqlServerJob.SqlserverJobConfig.class);
+        SqlServerJob oracleJob = new SqlServerJob();
+        oracleJob.setUser(config.getUser());
+        oracleJob.setHostname(config.getHostname());
+        oracleJob.setPassword(config.getPassword());
+        oracleJob.setPort(config.getPort());
+        oracleJob.setServerName(config.getServerName());
+        oracleJob.setDbname(config.getDbname());
+
+        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        oracleJob.setOffset(offset);
+
+        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        oracleJob.setSnapshot(snapshot);
+
+        SqlServerJob.History history = new SqlServerJob.History();
+        history.setFilename(config.getHistoryFilename());
+        oracleJob.setHistory(history);
+
+        return oracleJob;
+    }
+
     public static MqttJob getMqttJob(DataConfig dataConfigs) {
         MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
                 MqttJob.MqttJobConfig.class);
@@ -309,6 +341,12 @@ public class JobProfileDto {
                 job.setSource(KAFKA_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case SQLSERVER:
+                SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
+                job.setSqlserverJob(sqlserverJob);
+                job.setSource(SQLSERVER_SOURCE);
+                profileDto.setJob(job);
+                break;
             case MONGODB:
                 MongoJob mongoJob = getMongoJob(dataConfig);
                 job.setMongoJob(mongoJob);
@@ -349,6 +387,7 @@ public class JobProfileDto {
         private KafkaJob kafkaJob;
         private MongoJob mongoJob;
         private MqttJob mqttJob;
+        private SqlServerJob sqlserverJob;
     }
 
     @Data
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java
new file mode 100644
index 000000000..735c745bb
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class SqlServerJob {
+
+    private String hostname;
+    private String user;
+    private String password;
+    private String port;
+    private String serverName;
+    private String dbname;
+
+    private SqlServerJob.Snapshot snapshot;
+    private SqlServerJob.Offset offset;
+    private SqlServerJob.History history;
+
+    @Data
+    public static class Offset {
+
+        private String intervalMs;
+        private String filename;
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+    @Data
+    public static class Snapshot {
+
+        private String mode;
+    }
+
+    @Data
+    public static class History {
+
+        private String filename;
+    }
+
+    @Data
+    public static class SqlserverJobConfig {
+
+        private String hostname;
+        private String user;
+        private String password;
+        private String port;
+        private String dbname;
+        private String serverName;
+
+        private String snapshotMode;
+        private String intervalMs;
+        private String offsetFilename;
+        private String historyFilename;
+
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 141f7ff9f..7bc8eb0f6 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -85,6 +85,11 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-sqlserver</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-mongodb</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index af6d04a93..a65006152 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,56 +17,32 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
-import org.apache.inlong.agent.utils.AgentDbUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 
 /**
- * SQLServer SQL source, split SQLServer SQL source job into multi readers
+ * SQLServer source
  */
 public class SQLServerSource extends AbstractSource {
 
     private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class);
 
-    public static final String JOB_DATABASE_SQL = "job.sql.command";
-
     public SQLServerSource() {
     }
 
-    private List<Reader> splitSqlJob(String sqlPattern) {
-        final List<Reader> result = new ArrayList<>();
-        String[] sqlList = AgentDbUtils.replaceDynamicSeq(sqlPattern);
-        if (Objects.nonNull(sqlList)) {
-            Arrays.stream(sqlList).forEach(sql -> {
-                result.add(new SQLServerReader(sql));
-            });
-        }
-        return result;
-    }
-
     @Override
     public List<Reader> split(JobProfile conf) {
         super.init(conf);
-        String sqlPattern = conf.get(JOB_DATABASE_SQL, StringUtils.EMPTY).toLowerCase();
-        List<Reader> readerList = null;
-        if (StringUtils.isNotEmpty(sqlPattern)) {
-            readerList = splitSqlJob(sqlPattern);
-        }
-        if (CollectionUtils.isNotEmpty(readerList)) {
-            sourceMetric.sourceSuccessCount.incrementAndGet();
-        } else {
-            sourceMetric.sourceFailCount.incrementAndGet();
-        }
+        Reader sqlServerReader = new SQLServerReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(sqlServerReader);
+        sourceMetric.sourceSuccessCount.incrementAndGet();
         return readerList;
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index c4daa8560..079d9af07 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -17,114 +17,129 @@
 
 package org.apache.inlong.agent.plugin.sources.reader;
 
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang3.CharUtils;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.sqlserver.SqlServerConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.VARBINARY;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
 
 /**
- * Read data from SQLServer database by SQL
+ * Read data from SQLServer database by Debezium
  */
 public class SQLServerReader extends AbstractReader {
 
     public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
-    public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
-    public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
     public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
     public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
+    public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
+    public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
     public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
-    public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize";
-    public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
-    public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
-    public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
-    public static final String STD_FIELD_SEPARATOR_SHORT = "\001";
-    public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
-    // pre-set sql lines, commands like "set xxx=xx;"
-    public static final String JOB_DATABASE_TYPE = "job.database.type";
-    public static final String SQLSERVER = "sqlserver";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode";
+    public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize";
+    public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos";
+
+    public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName";
+
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
+
     private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
-    private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR),
-            String.valueOf(CharUtils.LF)};
-    private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY};
+    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
 
-    private final String sql;
+    private static final Gson GSON = new Gson();
 
-    private PreparedStatement preparedStatement;
-    private Connection conn;
-    private ResultSet resultSet;
-    private int columnCount;
+    private String databaseStoreHistoryName;
+    private String instanceId;
+    private String dbName;
+    private String serverName;
+    private String userName;
+    private String password;
+    private String hostName;
+    private String port;
+    private String offsetFlushIntervalMs;
+    private String offsetStoreFileName;
+    private String snapshotMode;
+    private String offset;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
 
-    // column types
-    private String[] columnTypeNames;
-    private int[] columnTypeCodes;
+    private ExecutorService executor;
+    private SqlServerSnapshotBase sqlServerSnapshot;
     private boolean finished = false;
-    private String separator;
 
-    public SQLServerReader(String sql) {
-        this.sql = sql;
+    private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
+    private JobProfile jobProfile;
+    private boolean destroyed = false;
+
+    public SQLServerReader() {
+
     }
 
     @Override
     public Message read() {
-        try {
-            if (!resultSet.next()) {
-                finished = true;
-                return null;
-            }
-            final List<String> lineColumns = new ArrayList<>();
-            for (int i = 1; i <= columnCount; i++) {
-                final String dataValue;
-                /* handle special blob value, encode with base64, BLOB=2004 */
-                final int typeCode = columnTypeCodes[i - 1];
-                final String typeName = columnTypeNames[i - 1];
-
-                // binary type
-                if (typeCode == BLOB || typeCode == BINARY || typeCode == VARBINARY
-                        || typeCode == LONGVARBINARY || typeName.contains("BLOB")) {
-                    final byte[] data = resultSet.getBytes(i);
-                    dataValue = new String(Base64.encodeBase64(data, false), StandardCharsets.UTF_8);
-                } else {
-                    // non-binary type
-                    dataValue = StringUtils.replaceEachRepeatedly(resultSet.getString(i),
-                            NEW_LINE_CHARS, EMPTY_CHARS);
-                }
-                lineColumns.add(dataValue);
-            }
-            long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                    System.currentTimeMillis(), 1, dataSize);
-            readerMetric.pluginReadSuccessCount.incrementAndGet();
-            readerMetric.pluginReadCount.incrementAndGet();
-            return generateMessage(lineColumns);
-        } catch (Exception ex) {
-            LOGGER.error("error while reading data", ex);
-            readerMetric.pluginReadFailCount.incrementAndGet();
-            readerMetric.pluginReadCount.incrementAndGet();
-            throw new RuntimeException(ex);
+        if (!sqlServerMessageQueue.isEmpty()) {
+            return getSqlServerMessage();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * poll message from buffer pool
+     *
+     * @return org.apache.inlong.agent.plugin.Message
+     */
+    private DefaultMessage getSqlServerMessage() {
+        // Retrieves and removes the head of this queue,
+        // or returns null if this queue is empty.
+        Pair<String, String> message = sqlServerMessageQueue.poll();
+        if (Objects.isNull(message)) {
+            return null;
         }
+        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+        header.put(PROXY_KEY_DATA, message.getKey());
+        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
     }
 
-    private Message generateMessage(List<String> lineColumns) {
-        return new DefaultMessage(StringUtils.join(lineColumns, separator).getBytes(StandardCharsets.UTF_8));
+    public boolean isDestroyed() {
+        return destroyed;
     }
 
     @Override
@@ -134,7 +149,7 @@ public class SQLServerReader extends AbstractReader {
 
     @Override
     public String getReadSource() {
-        return sql;
+        return instanceId;
     }
 
     @Override
@@ -149,12 +164,16 @@ public class SQLServerReader extends AbstractReader {
 
     @Override
     public String getSnapshot() {
-        return StringUtils.EMPTY;
+        if (sqlServerSnapshot != null) {
+            return sqlServerSnapshot.getSnapshot();
+        } else {
+            return StringUtils.EMPTY;
+        }
     }
 
     @Override
     public void finishRead() {
-        destroy();
+        this.finished = true;
     }
 
     @Override
@@ -162,59 +181,146 @@ public class SQLServerReader extends AbstractReader {
         return true;
     }
 
+    private String tryToInitAndGetHistoryPath() {
+        String historyPath = agentConf.get(
+                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+        String parentPath = agentConf.get(
+                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+    }
+
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
-        String userName = jobConf.get(JOB_DATABASE_USER);
-        String password = jobConf.get(JOB_DATABASE_PASSWORD);
-        String hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
-        String dbname = jobConf.get(JOB_DATABASE_DBNAME);
-        int port = jobConf.getInt(JOB_DATABASE_PORT);
-
-        String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS,
-                DEFAULT_JOB_DATABASE_DRIVER_CLASS);
-        separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
+        jobProfile = jobConf;
+        LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr());
+        userName = jobConf.get(JOB_DATABASE_USER);
+        password = jobConf.get(JOB_DATABASE_PASSWORD);
+        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+        port = jobConf.get(JOB_DATABASE_PORT);
+        dbName = jobConf.get(JOB_DATABASE_DBNAME);
+        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+        instanceId = jobConf.getInstanceId();
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL);
+        sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
         finished = false;
+
+        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
+        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+        sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName);
+        sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile());
+
+        Properties props = getEngineProps();
+
+        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+                        io.debezium.engine.format.Json.class)
+                .using(props)
+                .notifying((records, committer) -> {
+                    try {
+                        for (ChangeEvent<String, String> record : records) {
+                            DebeziumFormat debeziumFormat = GSON
+                                    .fromJson(record.value(), DebeziumFormat.class);
+                            sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+                            committer.markProcessed(record);
+                        }
+                        committer.markBatchFinished();
+                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), records.size(), dataSize);
+                        readerMetric.pluginReadSuccessCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                    } catch (Exception e) {
+                        readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                        LOGGER.error("parse SqlServer message error", e);
+                    }
+                })
+                .using((success, message, error) -> {
+                    if (!success) {
+                        LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error);
+                    }
+                }).build();
+
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(engine);
+
+        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+    }
+
+    private String serializeOffset() {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
+        sourceOffset.put("file", specificOffsetFile);
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
+        sourceOffset.put("pos", specificOffsetPos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", instanceId);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
         try {
-            String databaseType = jobConf.get(JOB_DATABASE_TYPE, SQLSERVER);
-            String url = String.format("jdbc:%s://%s:%d;databaseName=%s;", databaseType, hostName, port, dbname);
-            conn = AgentDbUtils.getConnectionFailover(driverClass, url, userName, password);
-            preparedStatement = conn.prepareStatement(sql);
-            preparedStatement.setFetchSize(batchSize);
-            resultSet = preparedStatement.executeQuery();
-
-            initColumnMeta();
-        } catch (Exception ex) {
-            LOGGER.error("error create statement", ex);
-            destroy();
-            throw new RuntimeException(ex);
+            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
         }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
     }
 
-    /**
-     * Init column meta data.
-     *
-     * @throws Exception - sql exception
-     */
-    private void initColumnMeta() throws Exception {
-        columnCount = resultSet.getMetaData().getColumnCount();
-        columnTypeNames = new String[columnCount];
-        columnTypeCodes = new int[columnCount];
-        for (int i = 0; i < columnCount; i++) {
-            columnTypeCodes[i] = resultSet.getMetaData().getColumnType(i + 1);
-            String t = resultSet.getMetaData().getColumnTypeName(i + 1);
-            if (t != null) {
-                columnTypeNames[i] = t.toUpperCase();
-            }
+    private Properties getEngineProps() {
+        Properties props = new Properties();
+        props.setProperty("name", "engine" + instanceId);
+        props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
+        props.setProperty("database.hostname", hostName);
+        props.setProperty("database.port", port);
+        props.setProperty("database.user", userName);
+        props.setProperty("database.password", password);
+        props.setProperty("database.dbname", dbName);
+        props.setProperty("database.server.name", serverName);
+        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+        props.setProperty("database.snapshot.mode", snapshotMode);
+        props.setProperty("key.converter.schemas.enable", "false");
+        props.setProperty("value.converter.schemas.enable", "false");
+        props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
+        } else {
+            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
         }
+        props.setProperty("tombstones.on.delete", "false");
+        props.setProperty("converters", "datetime");
+        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+        props.setProperty("datetime.format.date", "yyyy-MM-dd");
+        props.setProperty("datetime.format.time", "HH:mm:ss");
+        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+        LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props);
+        return props;
     }
 
     @Override
     public void destroy() {
-        finished = true;
-        AgentUtils.finallyClose(resultSet);
-        AgentUtils.finallyClose(preparedStatement);
-        AgentUtils.finallyClose(conn);
+        synchronized (this) {
+            if (!destroyed) {
+                this.executor.shutdownNow();
+                this.sqlServerSnapshot.close();
+                this.destroyed = true;
+            }
+        }
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java
new file mode 100644
index 000000000..62ee38618
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * SqlServer Snapshot
+ */
+public class SqlServerSnapshotBase extends AbstractSnapshot {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotBase.class);
+    private final File file;
+
+    public SqlServerSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        byte[] offset = this.load(this.file);
+        return ENCODER.encodeToString(offset);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
index b278abb73..58f83e2f4 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
@@ -18,43 +18,47 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
 import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Objects;
+import java.util.UUID;
 
-import static org.junit.Assert.assertNotNull;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 
 /**
  * Test cases for {@link SQLServerReader}.
  */
 public class TestSQLServerConnect {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestSQLServerConnect.class);
+
     /**
      * Just using in local test.
      */
+
     @Ignore
-    public void testSQLServerReader() {
-        JobProfile jobProfile = JobProfile.parseJsonStr("{}");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_USER, "sa");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_PASSWORD, "123456");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_HOSTNAME, "127.0.0.1");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_PORT, "1434");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_DBNAME, "inlong");
-        final String sql = "select * from dbo.test01";
-        jobProfile.set(SQLServerSource.JOB_DATABASE_SQL, sql);
-        final SQLServerSource source = new SQLServerSource();
-        List<Reader> readers = source.split(jobProfile);
-        for (Reader reader : readers) {
-            reader.init(jobProfile);
-            while (!reader.isFinished()) {
-                Message message = reader.read();
-                if (Objects.nonNull(message)) {
-                    assertNotNull(message.getBody());
-                }
+    public void testSqlServer() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set("job.sqlserverJob.hostname", "localhost");
+        jobProfile.set("job.sqlserverJob.port", "1434");
+        jobProfile.set("job.sqlserverJob.user", "sa");
+        jobProfile.set("job.sqlserverJob.password", "123456");
+        jobProfile.set("job.sqlserverJob.dbname", "inlong");
+        jobProfile.set("job.sqlserverJob.serverName", "fullfillment");
+        jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+        SQLServerReader sqlServerReader = new SQLServerReader();
+        sqlServerReader.init(jobProfile);
+        while (true) {
+            Message message = sqlServerReader.read();
+            if (message != null) {
+                LOGGER.info("event content: {}", message);
             }
         }
     }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
index 23f2fc416..33e291fbe 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
@@ -17,15 +17,17 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.commons.lang3.StringUtils;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
-import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,11 +37,10 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Types;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.field;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -60,7 +60,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Test cases for {@link SQLServerReader}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({AgentDbUtils.class, MetricRegister.class, AuditUtils.class, SQLServerReader.class})
+@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, SQLServerReader.class})
 @PowerMockIgnore({"javax.management.*"})
 public class TestSQLServerReader {
 
@@ -70,28 +70,31 @@ public class TestSQLServerReader {
     private JobProfile jobProfile;
 
     @Mock
-    private Connection conn;
+    private AgentMetricItemSet agentMetricItemSet;
 
     @Mock
-    private PreparedStatement preparedStatement;
+    private AgentMetricItem agentMetricItem;
 
     @Mock
-    private ResultSet resultSet;
+    private SqlServerSnapshotBase sqlServerSnapshot;
 
     @Mock
-    private ResultSetMetaData metaData;
+    private DebeziumEngine.Builder builder;
 
     @Mock
-    private AgentMetricItemSet agentMetricItemSet;
+    private ExecutorService executorService;
 
     @Mock
-    private AgentMetricItem agentMetricItem;
+    private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
+
+    @Mock
+    private DebeziumEngine<ChangeEvent<String, String>> engine;
 
     private AtomicLong atomicLong;
 
     private AtomicLong atomicCountLong;
 
-    private String sql;
+    private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
 
     @Before
     public void setUp() throws Exception {
@@ -99,44 +102,60 @@ public class TestSQLServerReader {
         final String password = "123456";
         final String hostname = "127.0.0.1";
         final String port = "1434";
-        final String dbname = "inlong";
-        final String typeName1 = "int";
-        final String typeName2 = "varchar";
         final String groupId = "group01";
         final String streamId = "stream01";
+        final String dbName = "inlong";
+        final String serverName = "server1";
+        final String offsetFlushIntervalMs = "1000";
+        final String offsetStoreFileName = "/opt/offset.dat";
+        final String snapshotMode = SqlServerConstants.INITIAL;
+        final int queueSize = 1000;
+        final String databaseStoreHistoryName = "/opt/history.dat";
+        final String offset = "111";
+        final String specificOffsetFile = "";
+        final String specificOffsetPos = "-1";
+
         atomicLong = new AtomicLong(0L);
         atomicCountLong = new AtomicLong(0L);
 
-        sql = "select * from dbo.test01";
-
+        when(jobProfile.getInstanceId()).thenReturn(instanceId);
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId);
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_USER))).thenReturn(username);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PASSWORD))).thenReturn(password);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PORT))).thenReturn(port);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbname);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DRIVER_CLASS), anyString())).thenReturn(
-                SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS);
-        when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_BATCH_SIZE), anyInt())).thenReturn(
-                SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_TYPE), anyString())).thenReturn(
-                SQLServerReader.SQLSERVER);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SEPARATOR), anyString())).thenReturn(
-                SQLServerReader.STD_FIELD_SEPARATOR_SHORT);
-        mockStatic(AgentDbUtils.class);
-        when(AgentDbUtils.getConnectionFailover(eq(SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS), anyString(),
-                eq(username), eq(password))).thenReturn(conn);
-        when(conn.prepareStatement(anyString())).thenReturn(preparedStatement);
-        when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        when(resultSet.getMetaData()).thenReturn(metaData);
-        when(metaData.getColumnCount()).thenReturn(2);
-        when(metaData.getColumnName(1)).thenReturn("id");
-        when(metaData.getColumnName(2)).thenReturn("cell");
-        when(metaData.getColumnType(1)).thenReturn(Types.INTEGER);
-        when(metaData.getColumnType(2)).thenReturn(Types.VARCHAR);
-        when(metaData.getColumnTypeName(1)).thenReturn(typeName1);
-        when(metaData.getColumnTypeName(2)).thenReturn(typeName2);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn(
+                offsetFlushIntervalMs);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn(
+                offsetStoreFileName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode);
+        when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(
+                databaseStoreHistoryName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn(
+                specificOffsetFile);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn(
+                specificOffsetPos);
+        whenNew(SqlServerSnapshotBase.class).withAnyArguments().thenReturn(sqlServerSnapshot);
+
+        //mock sqlServerMessageQueue
+        whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(sqlServerMessageQueue);
+
+        //mock DebeziumEngine
+        mockStatic(DebeziumEngine.class);
+        when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+        when(builder.using(any(Properties.class))).thenReturn(builder);
+        when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+        when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+        when(builder.build()).thenReturn(engine);
+
+        //mock executorService
+        mockStatic(Executors.class);
+        when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
 
         //mock metrics
         whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
@@ -146,7 +165,7 @@ public class TestSQLServerReader {
 
         //init method
         mockStatic(MetricRegister.class);
-        (reader = new SQLServerReader(sql)).init(jobProfile);
+        (reader = new SQLServerReader()).init(jobProfile);
     }
 
     /**
@@ -154,23 +173,16 @@ public class TestSQLServerReader {
      */
     @Test
     public void testRead() throws Exception {
-        final String v11 = "11";
-        final String v12 = "12";
-        final String v21 = "aa";
-        final String v22 = "bb";
-
-        final String msg1 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v11, v12);
-        final String msg2 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v21, v22);
-
-        when(resultSet.next()).thenReturn(true, true, false);
-        when(resultSet.getString(1)).thenReturn(v11, v21);
-        when(resultSet.getString(2)).thenReturn(v12, v22);
-        Message message1 = reader.read();
-        assertEquals(msg1, message1.toString());
-        verify(preparedStatement, times(1)).setFetchSize(SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE);
-        Message message2 = reader.read();
-        assertEquals(msg2, message2.toString());
-        assertEquals(2L, atomicLong.get());
+        final String right = "value";
+        final String left = "key";
+        final String dataKey = "dataKey";
+        when(sqlServerMessageQueue.isEmpty()).thenReturn(true);
+        assertEquals(null, reader.read());
+        when(sqlServerMessageQueue.isEmpty()).thenReturn(false);
+        when(sqlServerMessageQueue.poll()).thenReturn(Pair.of(left, right));
+        Message result = reader.read();
+        assertEquals(String.join(right, "\"", "\""), result.toString());
+        assertEquals(left, result.getHeader().get(dataKey));
     }
 
     /**
@@ -178,12 +190,11 @@ public class TestSQLServerReader {
      */
     @Test
     public void testDestroy() throws Exception {
-        assertFalse(reader.isFinished());
+        assertFalse(reader.isDestroyed());
         reader.destroy();
-        verify(resultSet).close();
-        verify(preparedStatement).close();
-        verify(conn).close();
-        assertTrue(reader.isFinished());
+        verify(executorService).shutdownNow();
+        verify(sqlServerSnapshot).close();
+        assertTrue(reader.isDestroyed());
     }
 
     /**
@@ -192,10 +203,7 @@ public class TestSQLServerReader {
     @Test
     public void testFinishRead() throws Exception {
         assertFalse(reader.isFinished());
-        reader.destroy();
-        verify(resultSet).close();
-        verify(preparedStatement).close();
-        verify(conn).close();
+        reader.finishRead();
         assertTrue(reader.isFinished());
     }
 
@@ -212,7 +220,9 @@ public class TestSQLServerReader {
      */
     @Test
     public void testGetSnapshot() {
-        assertEquals(StringUtils.EMPTY, reader.getSnapshot());
+        final String snapShort = "snapShort";
+        when(sqlServerSnapshot.getSnapshot()).thenReturn(snapShort);
+        assertEquals(snapShort, reader.getSnapshot());
     }
 
     /**
@@ -220,6 +230,6 @@ public class TestSQLServerReader {
      */
     @Test
     public void testGetReadSource() {
-        assertEquals(sql, reader.getReadSource());
+        assertEquals(instanceId, reader.getReadSource());
     }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 97646ccbf..90b71df9a 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -17,9 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.common.metric.MetricItem;
@@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 import static org.powermock.api.support.membermodification.MemberMatcher.field;
@@ -84,20 +81,10 @@ public class TestSQLServerSource {
      */
     @Test
     public void testSplit() {
-        final String sql1 = "select * from dbo.test01";
-        final String sql2 = "select * from dbo.test${01,99}";
 
         // build mock
-        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn("test_group");
-        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn("test_stream");
-        when(jobProfile.get(eq(SQLServerSource.JOB_DATABASE_SQL), eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
-                sql1, sql2);
-
         final SQLServerSource source = new SQLServerSource();
-
         // assert
-        assertEquals(null, source.split(jobProfile));
         assertEquals(1, source.split(jobProfile).size());
-        assertEquals(99, source.split(jobProfile).size());
     }
 }
diff --git a/pom.xml b/pom.xml
index eadd23c91..7b5c185f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -606,6 +606,12 @@
                 <version>${debezium.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-sqlserver</artifactId>
+                <version>${debezium.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.debezium</groupId>
                 <artifactId>debezium-connector-mongodb</artifactId>


[inlong] 03/05: [INLONG-6440][Manager] Fix heartbeat information error for IP (#6444)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 448dada0a88907d40a604c706daaa5caad2dace7
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Tue Nov 8 11:39:09 2022 +0800

    [INLONG-6440][Manager] Fix heartbeat information error for IP (#6444)
---
 .../org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index d38a92510..010afee02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -153,6 +153,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
 
         // protocolType may be null, and the protocolTypes' length may be less than ports' length
         String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+        String[] ips = heartbeat.getIp().split(InlongConstants.COMMA);
         String protocolType = heartbeat.getProtocolType();
         String[] protocolTypes = null;
         if (StringUtils.isNotBlank(protocolType) && ports.length > 1) {
@@ -167,6 +168,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
             HeartbeatMsg heartbeatMsg = JsonUtils.parseObject(JsonUtils.toJsonByte(heartbeat), HeartbeatMsg.class);
             assert heartbeatMsg != null;
             heartbeatMsg.setPort(ports[i].trim());
+            heartbeatMsg.setIp(ips[i].trim());
             if (protocolTypes != null) {
                 heartbeatMsg.setProtocolType(protocolTypes[i]);
             } else {


[inlong] 02/05: [INLONG-6418][Dashboard] Support management of MongoDB source (#6454)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit cbe2af99859a514c749b138e8476d8ec31e0b770
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Tue Nov 8 11:35:24 2022 +0800

    [INLONG-6418][Dashboard] Support management of MongoDB source (#6454)
---
 inlong-dashboard/src/locales/cn.json               |  6 ++
 inlong-dashboard/src/locales/en.json               |  6 ++
 .../src/metas/sources/defaults/Mongodb.ts          | 95 ++++++++++++++++++++++
 .../src/metas/sources/defaults/index.ts            |  5 ++
 4 files changed, 112 insertions(+)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 91e9e36d7..3d74445f9 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -33,6 +33,12 @@
   "meta.Sources.Db.AllMigration": "是否整库迁移",
   "meta.Sources.Db.TableWhiteList": "表名白名单",
   "meta.Sources.Db.TableWhiteListHelp": "白名单应该是一个以逗号分隔的正则表达式列表,与要监控的表的完全限定名称相匹配。表的完全限定名称的格式为 <dbName>.<tableName>,其中 dbName 和 tablename 都可以配置正则表达式。比如:test_db.table*,inlong_db*.user*,表示采集 test_db 库中以 table 开头的所有表 + 以 inlong_db 开头的所有库下的以 user 开头的所有表。",
+  "meta.Sources.Mongodb.Hosts": "服务器主机",
+  "meta.Sources.Mongodb.Username": "用户名",
+  "meta.Sources.Mongodb.Password": "密码",
+  "meta.Sources.Mongodb.Database": "数据库名",
+  "meta.Sources.Mongodb.Collection": "集合名称",
+  "meta.Sources.Mongodb.PrimaryKey": "主键",
   "meta.Sinks.SinkName": "名称",
   "meta.Sinks.SinkNameRule": "以英文字母开头,只能包含英文字母、数字、中划线、下划线",
   "meta.Sinks.SinkType": "类型",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index e4d5062f4..5c7ba9880 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -33,6 +33,12 @@
   "meta.Sources.Db.AllMigration": "AllMigration",
   "meta.Sources.Db.TableWhiteList": "Table WhiteList",
   "meta.Sources.Db.TableWhiteListHelp": "The whitelist should be a comma-separated list of regular expressions that match the fully-qualified names of tables to be monitored. Fully-qualified names for tables are of the form <databaseName>.<tableName>, the dbName and tableName can be configured with regular expressions. For example: test_db.order*, inlong_db*.user*, means to collect all tables starting with order in the test_db database + all tables starting with user under all databases  [...]
+  "meta.Sources.Mongodb.Hosts": "Hosts",
+  "meta.Sources.Mongodb.Username": "Username",
+  "meta.Sources.Mongodb.Password": "Password",
+  "meta.Sources.Mongodb.Database": "Database",
+  "meta.Sources.Mongodb.Collection": "Collection",
+  "meta.Sources.Mongodb.PrimaryKey": "PrimaryKey",
   "meta.Sinks.SinkName": "Name",
   "meta.Sinks.SinkNameRule": "At the beginning of English letters, only English letters, numbers, minus, and underscores",
   "meta.Sinks.SinkType": "Type",
diff --git a/inlong-dashboard/src/metas/sources/defaults/Mongodb.ts b/inlong-dashboard/src/metas/sources/defaults/Mongodb.ts
new file mode 100644
index 000000000..bfea4b2da
--- /dev/null
+++ b/inlong-dashboard/src/metas/sources/defaults/Mongodb.ts
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { SourceInfo } from '../common/SourceInfo';
+
+const { I18n } = DataWithBackend;
+const { FieldDecorator } = RenderRow;
+const { ColumnDecorator } = RenderList;
+
+export default class MongodbSource
+  extends SourceInfo
+  implements DataWithBackend, RenderRow, RenderList
+{
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+      placeholder: 'localhost:27017,localhost:27018',
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.Mongodb.Hosts')
+  hosts: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.Mongodb.Username')
+  username: string;
+
+  @FieldDecorator({
+    type: 'password',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Mongodb.Password')
+  password: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Mongodb.Database')
+  database: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Mongodb.Collection')
+  collection: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Mongodb.PrimaryKey')
+  primaryKey: string;
+}
diff --git a/inlong-dashboard/src/metas/sources/defaults/index.ts b/inlong-dashboard/src/metas/sources/defaults/index.ts
index 9e1720de2..8f2843153 100644
--- a/inlong-dashboard/src/metas/sources/defaults/index.ts
+++ b/inlong-dashboard/src/metas/sources/defaults/index.ts
@@ -41,4 +41,9 @@ export const allDefaultSources: MetaExportWithBackendList<SourceMetaType> = [
     value: 'MYSQL_BINLOG',
     LoadEntity: () => import('./MySQLBinlog'),
   },
+  {
+    label: 'Mongodb',
+    value: 'MONGODB',
+    LoadEntity: () => import('./Mongodb'),
+  },
 ];