You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 03:55:02 UTC

[inlong] branch master updated: [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d07eb5165 [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
d07eb5165 is described below

commit d07eb51652f9e808c33580a890c487b8dcbbd47f
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Nov 8 11:54:57 2022 +0800

    [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
---
 .../inlong/agent/constant/SqlServerConstants.java  |  39 +++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  39 +++
 .../org/apache/inlong/agent/pojo/SqlServerJob.java |  76 +++++
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../agent/plugin/sources/SQLServerSource.java      |  34 +-
 .../plugin/sources/reader/SQLServerReader.java     | 344 ++++++++++++++-------
 .../sources/snapshot/SqlServerSnapshotBase.java    |  52 ++++
 .../agent/plugin/sources/TestSQLServerConnect.java |  48 +--
 .../agent/plugin/sources/TestSQLServerReader.java  | 156 +++++-----
 .../agent/plugin/sources/TestSQLServerSource.java  |  13 -
 pom.xml                                            |   6 +
 11 files changed, 556 insertions(+), 256 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java
new file mode 100644
index 000000000..ffb4d7ae8
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class SqlServerConstants {
+
+    /**
+     * Takes a snapshot of structure and data of captured tables;
+     * useful if topics should be populated with a complete representation of the data from the captured tables.
+     */
+    public static final String INITIAL = "initial";
+
+    /**
+     * Takes a snapshot of structure and data like initial
+     * but instead does not transition into streaming changes once the snapshot has completed.
+     */
+    public static final String INITIAL_ONLY = "initial_only";
+
+    /**
+     * Takes a snapshot of the structure of captured tables only;
+     * useful if only changes happening from now onwards should be propagated to topics.
+     */
+    public static final String SCHEMA_ONLY = "schema_only";
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index d57b2c341..810a42d2b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -58,6 +58,10 @@ public class JobProfileDto {
      * mqtt source
      */
     public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource";
+    /**
+     * sqlserver source
+     */
+    public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SqlServerSource";
 
     private static final Gson GSON = new Gson();
 
@@ -226,6 +230,34 @@ public class JobProfileDto {
         return mongoJob;
     }
 
+    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+        SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                SqlServerJob.SqlserverJobConfig.class);
+        SqlServerJob oracleJob = new SqlServerJob();
+        oracleJob.setUser(config.getUser());
+        oracleJob.setHostname(config.getHostname());
+        oracleJob.setPassword(config.getPassword());
+        oracleJob.setPort(config.getPort());
+        oracleJob.setServerName(config.getServerName());
+        oracleJob.setDbname(config.getDbname());
+
+        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        oracleJob.setOffset(offset);
+
+        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        oracleJob.setSnapshot(snapshot);
+
+        SqlServerJob.History history = new SqlServerJob.History();
+        history.setFilename(config.getHistoryFilename());
+        oracleJob.setHistory(history);
+
+        return oracleJob;
+    }
+
     public static MqttJob getMqttJob(DataConfig dataConfigs) {
         MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
                 MqttJob.MqttJobConfig.class);
@@ -309,6 +341,12 @@ public class JobProfileDto {
                 job.setSource(KAFKA_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case SQLSERVER:
+                SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
+                job.setSqlserverJob(sqlserverJob);
+                job.setSource(SQLSERVER_SOURCE);
+                profileDto.setJob(job);
+                break;
             case MONGODB:
                 MongoJob mongoJob = getMongoJob(dataConfig);
                 job.setMongoJob(mongoJob);
@@ -349,6 +387,7 @@ public class JobProfileDto {
         private KafkaJob kafkaJob;
         private MongoJob mongoJob;
         private MqttJob mqttJob;
+        private SqlServerJob sqlserverJob;
     }
 
     @Data
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java
new file mode 100644
index 000000000..735c745bb
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class SqlServerJob {
+
+    private String hostname;
+    private String user;
+    private String password;
+    private String port;
+    private String serverName;
+    private String dbname;
+
+    private SqlServerJob.Snapshot snapshot;
+    private SqlServerJob.Offset offset;
+    private SqlServerJob.History history;
+
+    @Data
+    public static class Offset {
+
+        private String intervalMs;
+        private String filename;
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+    @Data
+    public static class Snapshot {
+
+        private String mode;
+    }
+
+    @Data
+    public static class History {
+
+        private String filename;
+    }
+
+    @Data
+    public static class SqlserverJobConfig {
+
+        private String hostname;
+        private String user;
+        private String password;
+        private String port;
+        private String dbname;
+        private String serverName;
+
+        private String snapshotMode;
+        private String intervalMs;
+        private String offsetFilename;
+        private String historyFilename;
+
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index b51fd1963..fa8e03a41 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -85,6 +85,11 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-sqlserver</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-mongodb</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index af6d04a93..a65006152 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,56 +17,32 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
-import org.apache.inlong.agent.utils.AgentDbUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 
 /**
- * SQLServer SQL source, split SQLServer SQL source job into multi readers
+ * SQLServer source
  */
 public class SQLServerSource extends AbstractSource {
 
     private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class);
 
-    public static final String JOB_DATABASE_SQL = "job.sql.command";
-
     public SQLServerSource() {
     }
 
-    private List<Reader> splitSqlJob(String sqlPattern) {
-        final List<Reader> result = new ArrayList<>();
-        String[] sqlList = AgentDbUtils.replaceDynamicSeq(sqlPattern);
-        if (Objects.nonNull(sqlList)) {
-            Arrays.stream(sqlList).forEach(sql -> {
-                result.add(new SQLServerReader(sql));
-            });
-        }
-        return result;
-    }
-
     @Override
     public List<Reader> split(JobProfile conf) {
         super.init(conf);
-        String sqlPattern = conf.get(JOB_DATABASE_SQL, StringUtils.EMPTY).toLowerCase();
-        List<Reader> readerList = null;
-        if (StringUtils.isNotEmpty(sqlPattern)) {
-            readerList = splitSqlJob(sqlPattern);
-        }
-        if (CollectionUtils.isNotEmpty(readerList)) {
-            sourceMetric.sourceSuccessCount.incrementAndGet();
-        } else {
-            sourceMetric.sourceFailCount.incrementAndGet();
-        }
+        Reader sqlServerReader = new SQLServerReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(sqlServerReader);
+        sourceMetric.sourceSuccessCount.incrementAndGet();
         return readerList;
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index c4daa8560..079d9af07 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -17,114 +17,129 @@
 
 package org.apache.inlong.agent.plugin.sources.reader;
 
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang3.CharUtils;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.sqlserver.SqlServerConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.VARBINARY;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
 
 /**
- * Read data from SQLServer database by SQL
+ * Read data from SQLServer database by Debezium
  */
 public class SQLServerReader extends AbstractReader {
 
     public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
-    public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
-    public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
     public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
     public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
+    public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
+    public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
     public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
-    public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize";
-    public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
-    public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
-    public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
-    public static final String STD_FIELD_SEPARATOR_SHORT = "\001";
-    public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
-    // pre-set sql lines, commands like "set xxx=xx;"
-    public static final String JOB_DATABASE_TYPE = "job.database.type";
-    public static final String SQLSERVER = "sqlserver";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode";
+    public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize";
+    public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos";
+
+    public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName";
+
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
+
     private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
-    private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR),
-            String.valueOf(CharUtils.LF)};
-    private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY};
+    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
 
-    private final String sql;
+    private static final Gson GSON = new Gson();
 
-    private PreparedStatement preparedStatement;
-    private Connection conn;
-    private ResultSet resultSet;
-    private int columnCount;
+    private String databaseStoreHistoryName;
+    private String instanceId;
+    private String dbName;
+    private String serverName;
+    private String userName;
+    private String password;
+    private String hostName;
+    private String port;
+    private String offsetFlushIntervalMs;
+    private String offsetStoreFileName;
+    private String snapshotMode;
+    private String offset;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
 
-    // column types
-    private String[] columnTypeNames;
-    private int[] columnTypeCodes;
+    private ExecutorService executor;
+    private SqlServerSnapshotBase sqlServerSnapshot;
     private boolean finished = false;
-    private String separator;
 
-    public SQLServerReader(String sql) {
-        this.sql = sql;
+    private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
+    private JobProfile jobProfile;
+    private boolean destroyed = false;
+
+    public SQLServerReader() {
+
     }
 
     @Override
     public Message read() {
-        try {
-            if (!resultSet.next()) {
-                finished = true;
-                return null;
-            }
-            final List<String> lineColumns = new ArrayList<>();
-            for (int i = 1; i <= columnCount; i++) {
-                final String dataValue;
-                /* handle special blob value, encode with base64, BLOB=2004 */
-                final int typeCode = columnTypeCodes[i - 1];
-                final String typeName = columnTypeNames[i - 1];
-
-                // binary type
-                if (typeCode == BLOB || typeCode == BINARY || typeCode == VARBINARY
-                        || typeCode == LONGVARBINARY || typeName.contains("BLOB")) {
-                    final byte[] data = resultSet.getBytes(i);
-                    dataValue = new String(Base64.encodeBase64(data, false), StandardCharsets.UTF_8);
-                } else {
-                    // non-binary type
-                    dataValue = StringUtils.replaceEachRepeatedly(resultSet.getString(i),
-                            NEW_LINE_CHARS, EMPTY_CHARS);
-                }
-                lineColumns.add(dataValue);
-            }
-            long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                    System.currentTimeMillis(), 1, dataSize);
-            readerMetric.pluginReadSuccessCount.incrementAndGet();
-            readerMetric.pluginReadCount.incrementAndGet();
-            return generateMessage(lineColumns);
-        } catch (Exception ex) {
-            LOGGER.error("error while reading data", ex);
-            readerMetric.pluginReadFailCount.incrementAndGet();
-            readerMetric.pluginReadCount.incrementAndGet();
-            throw new RuntimeException(ex);
+        if (!sqlServerMessageQueue.isEmpty()) {
+            return getSqlServerMessage();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * poll message from buffer pool
+     *
+     * @return org.apache.inlong.agent.plugin.Message
+     */
+    private DefaultMessage getSqlServerMessage() {
+        // Retrieves and removes the head of this queue,
+        // or returns null if this queue is empty.
+        Pair<String, String> message = sqlServerMessageQueue.poll();
+        if (Objects.isNull(message)) {
+            return null;
         }
+        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+        header.put(PROXY_KEY_DATA, message.getKey());
+        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
     }
 
-    private Message generateMessage(List<String> lineColumns) {
-        return new DefaultMessage(StringUtils.join(lineColumns, separator).getBytes(StandardCharsets.UTF_8));
+    public boolean isDestroyed() {
+        return destroyed;
     }
 
     @Override
@@ -134,7 +149,7 @@ public class SQLServerReader extends AbstractReader {
 
     @Override
     public String getReadSource() {
-        return sql;
+        return instanceId;
     }
 
     @Override
@@ -149,12 +164,16 @@ public class SQLServerReader extends AbstractReader {
 
     @Override
     public String getSnapshot() {
-        return StringUtils.EMPTY;
+        if (sqlServerSnapshot != null) {
+            return sqlServerSnapshot.getSnapshot();
+        } else {
+            return StringUtils.EMPTY;
+        }
     }
 
     @Override
     public void finishRead() {
-        destroy();
+        this.finished = true;
     }
 
     @Override
@@ -162,59 +181,146 @@ public class SQLServerReader extends AbstractReader {
         return true;
     }
 
+    private String tryToInitAndGetHistoryPath() {
+        String historyPath = agentConf.get(
+                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+        String parentPath = agentConf.get(
+                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+    }
+
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
-        String userName = jobConf.get(JOB_DATABASE_USER);
-        String password = jobConf.get(JOB_DATABASE_PASSWORD);
-        String hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
-        String dbname = jobConf.get(JOB_DATABASE_DBNAME);
-        int port = jobConf.getInt(JOB_DATABASE_PORT);
-
-        String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS,
-                DEFAULT_JOB_DATABASE_DRIVER_CLASS);
-        separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
+        jobProfile = jobConf;
+        LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr());
+        userName = jobConf.get(JOB_DATABASE_USER);
+        password = jobConf.get(JOB_DATABASE_PASSWORD);
+        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+        port = jobConf.get(JOB_DATABASE_PORT);
+        dbName = jobConf.get(JOB_DATABASE_DBNAME);
+        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+        instanceId = jobConf.getInstanceId();
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL);
+        sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
         finished = false;
+
+        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
+        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+        sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName);
+        sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile());
+
+        Properties props = getEngineProps();
+
+        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+                        io.debezium.engine.format.Json.class)
+                .using(props)
+                .notifying((records, committer) -> {
+                    try {
+                        for (ChangeEvent<String, String> record : records) {
+                            DebeziumFormat debeziumFormat = GSON
+                                    .fromJson(record.value(), DebeziumFormat.class);
+                            sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+                            committer.markProcessed(record);
+                        }
+                        committer.markBatchFinished();
+                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), records.size(), dataSize);
+                        readerMetric.pluginReadSuccessCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                    } catch (Exception e) {
+                        readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                        LOGGER.error("parse SqlServer message error", e);
+                    }
+                })
+                .using((success, message, error) -> {
+                    if (!success) {
+                        LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error);
+                    }
+                }).build();
+
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(engine);
+
+        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+    }
+
+    private String serializeOffset() {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
+        sourceOffset.put("file", specificOffsetFile);
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
+        sourceOffset.put("pos", specificOffsetPos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", instanceId);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
         try {
-            String databaseType = jobConf.get(JOB_DATABASE_TYPE, SQLSERVER);
-            String url = String.format("jdbc:%s://%s:%d;databaseName=%s;", databaseType, hostName, port, dbname);
-            conn = AgentDbUtils.getConnectionFailover(driverClass, url, userName, password);
-            preparedStatement = conn.prepareStatement(sql);
-            preparedStatement.setFetchSize(batchSize);
-            resultSet = preparedStatement.executeQuery();
-
-            initColumnMeta();
-        } catch (Exception ex) {
-            LOGGER.error("error create statement", ex);
-            destroy();
-            throw new RuntimeException(ex);
+            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
         }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
     }
 
-    /**
-     * Init column meta data.
-     *
-     * @throws Exception - sql exception
-     */
-    private void initColumnMeta() throws Exception {
-        columnCount = resultSet.getMetaData().getColumnCount();
-        columnTypeNames = new String[columnCount];
-        columnTypeCodes = new int[columnCount];
-        for (int i = 0; i < columnCount; i++) {
-            columnTypeCodes[i] = resultSet.getMetaData().getColumnType(i + 1);
-            String t = resultSet.getMetaData().getColumnTypeName(i + 1);
-            if (t != null) {
-                columnTypeNames[i] = t.toUpperCase();
-            }
+    private Properties getEngineProps() {
+        Properties props = new Properties();
+        props.setProperty("name", "engine" + instanceId);
+        props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
+        props.setProperty("database.hostname", hostName);
+        props.setProperty("database.port", port);
+        props.setProperty("database.user", userName);
+        props.setProperty("database.password", password);
+        props.setProperty("database.dbname", dbName);
+        props.setProperty("database.server.name", serverName);
+        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+        props.setProperty("database.snapshot.mode", snapshotMode);
+        props.setProperty("key.converter.schemas.enable", "false");
+        props.setProperty("value.converter.schemas.enable", "false");
+        props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+            props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
+        } else {
+            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
         }
+        props.setProperty("tombstones.on.delete", "false");
+        props.setProperty("converters", "datetime");
+        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+        props.setProperty("datetime.format.date", "yyyy-MM-dd");
+        props.setProperty("datetime.format.time", "HH:mm:ss");
+        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+        LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props);
+        return props;
     }
 
     @Override
     public void destroy() {
-        finished = true;
-        AgentUtils.finallyClose(resultSet);
-        AgentUtils.finallyClose(preparedStatement);
-        AgentUtils.finallyClose(conn);
+        synchronized (this) {
+            if (!destroyed) {
+                this.executor.shutdownNow();
+                this.sqlServerSnapshot.close();
+                this.destroyed = true;
+            }
+        }
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java
new file mode 100644
index 000000000..62ee38618
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * SqlServer Snapshot
+ */
+public class SqlServerSnapshotBase extends AbstractSnapshot {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotBase.class);
+    private final File file;
+
+    public SqlServerSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        byte[] offset = this.load(this.file);
+        return ENCODER.encodeToString(offset);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
index b278abb73..58f83e2f4 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
@@ -18,43 +18,47 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
 import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Objects;
+import java.util.UUID;
 
-import static org.junit.Assert.assertNotNull;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 
 /**
  * Test cases for {@link SQLServerReader}.
  */
 public class TestSQLServerConnect {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestSQLServerConnect.class);
+
     /**
      * Just using in local test.
      */
+
     @Ignore
-    public void testSQLServerReader() {
-        JobProfile jobProfile = JobProfile.parseJsonStr("{}");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_USER, "sa");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_PASSWORD, "123456");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_HOSTNAME, "127.0.0.1");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_PORT, "1434");
-        jobProfile.set(SQLServerReader.JOB_DATABASE_DBNAME, "inlong");
-        final String sql = "select * from dbo.test01";
-        jobProfile.set(SQLServerSource.JOB_DATABASE_SQL, sql);
-        final SQLServerSource source = new SQLServerSource();
-        List<Reader> readers = source.split(jobProfile);
-        for (Reader reader : readers) {
-            reader.init(jobProfile);
-            while (!reader.isFinished()) {
-                Message message = reader.read();
-                if (Objects.nonNull(message)) {
-                    assertNotNull(message.getBody());
-                }
+    public void testSqlServer() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set("job.sqlserverJob.hostname", "localhost");
+        jobProfile.set("job.sqlserverJob.port", "1434");
+        jobProfile.set("job.sqlserverJob.user", "sa");
+        jobProfile.set("job.sqlserverJob.password", "123456");
+        jobProfile.set("job.sqlserverJob.dbname", "inlong");
+        jobProfile.set("job.sqlserverJob.serverName", "fullfillment");
+        jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+        SQLServerReader sqlServerReader = new SQLServerReader();
+        sqlServerReader.init(jobProfile);
+        while (true) {
+            Message message = sqlServerReader.read();
+            if (message != null) {
+                LOGGER.info("event content: {}", message);
             }
         }
     }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
index 23f2fc416..33e291fbe 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
@@ -17,15 +17,17 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.commons.lang3.StringUtils;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
-import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,11 +37,10 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Types;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.field;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -60,7 +60,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Test cases for {@link SQLServerReader}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({AgentDbUtils.class, MetricRegister.class, AuditUtils.class, SQLServerReader.class})
+@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, SQLServerReader.class})
 @PowerMockIgnore({"javax.management.*"})
 public class TestSQLServerReader {
 
@@ -70,28 +70,31 @@ public class TestSQLServerReader {
     private JobProfile jobProfile;
 
     @Mock
-    private Connection conn;
+    private AgentMetricItemSet agentMetricItemSet;
 
     @Mock
-    private PreparedStatement preparedStatement;
+    private AgentMetricItem agentMetricItem;
 
     @Mock
-    private ResultSet resultSet;
+    private SqlServerSnapshotBase sqlServerSnapshot;
 
     @Mock
-    private ResultSetMetaData metaData;
+    private DebeziumEngine.Builder builder;
 
     @Mock
-    private AgentMetricItemSet agentMetricItemSet;
+    private ExecutorService executorService;
 
     @Mock
-    private AgentMetricItem agentMetricItem;
+    private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
+
+    @Mock
+    private DebeziumEngine<ChangeEvent<String, String>> engine;
 
     private AtomicLong atomicLong;
 
     private AtomicLong atomicCountLong;
 
-    private String sql;
+    private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
 
     @Before
     public void setUp() throws Exception {
@@ -99,44 +102,60 @@ public class TestSQLServerReader {
         final String password = "123456";
         final String hostname = "127.0.0.1";
         final String port = "1434";
-        final String dbname = "inlong";
-        final String typeName1 = "int";
-        final String typeName2 = "varchar";
         final String groupId = "group01";
         final String streamId = "stream01";
+        final String dbName = "inlong";
+        final String serverName = "server1";
+        final String offsetFlushIntervalMs = "1000";
+        final String offsetStoreFileName = "/opt/offset.dat";
+        final String snapshotMode = SqlServerConstants.INITIAL;
+        final int queueSize = 1000;
+        final String databaseStoreHistoryName = "/opt/history.dat";
+        final String offset = "111";
+        final String specificOffsetFile = "";
+        final String specificOffsetPos = "-1";
+
         atomicLong = new AtomicLong(0L);
         atomicCountLong = new AtomicLong(0L);
 
-        sql = "select * from dbo.test01";
-
+        when(jobProfile.getInstanceId()).thenReturn(instanceId);
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId);
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_USER))).thenReturn(username);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PASSWORD))).thenReturn(password);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname);
         when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PORT))).thenReturn(port);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbname);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DRIVER_CLASS), anyString())).thenReturn(
-                SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS);
-        when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_BATCH_SIZE), anyInt())).thenReturn(
-                SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_TYPE), anyString())).thenReturn(
-                SQLServerReader.SQLSERVER);
-        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SEPARATOR), anyString())).thenReturn(
-                SQLServerReader.STD_FIELD_SEPARATOR_SHORT);
-        mockStatic(AgentDbUtils.class);
-        when(AgentDbUtils.getConnectionFailover(eq(SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS), anyString(),
-                eq(username), eq(password))).thenReturn(conn);
-        when(conn.prepareStatement(anyString())).thenReturn(preparedStatement);
-        when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        when(resultSet.getMetaData()).thenReturn(metaData);
-        when(metaData.getColumnCount()).thenReturn(2);
-        when(metaData.getColumnName(1)).thenReturn("id");
-        when(metaData.getColumnName(2)).thenReturn("cell");
-        when(metaData.getColumnType(1)).thenReturn(Types.INTEGER);
-        when(metaData.getColumnType(2)).thenReturn(Types.VARCHAR);
-        when(metaData.getColumnTypeName(1)).thenReturn(typeName1);
-        when(metaData.getColumnTypeName(2)).thenReturn(typeName2);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn(
+                offsetFlushIntervalMs);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn(
+                offsetStoreFileName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode);
+        when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(
+                databaseStoreHistoryName);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn(
+                specificOffsetFile);
+        when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn(
+                specificOffsetPos);
+        whenNew(SqlServerSnapshotBase.class).withAnyArguments().thenReturn(sqlServerSnapshot);
+
+        //mock sqlServerMessageQueue
+        whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(sqlServerMessageQueue);
+
+        //mock DebeziumEngine
+        mockStatic(DebeziumEngine.class);
+        when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+        when(builder.using(any(Properties.class))).thenReturn(builder);
+        when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+        when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+        when(builder.build()).thenReturn(engine);
+
+        //mock executorService
+        mockStatic(Executors.class);
+        when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
 
         //mock metrics
         whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
@@ -146,7 +165,7 @@ public class TestSQLServerReader {
 
         //init method
         mockStatic(MetricRegister.class);
-        (reader = new SQLServerReader(sql)).init(jobProfile);
+        (reader = new SQLServerReader()).init(jobProfile);
     }
 
     /**
@@ -154,23 +173,16 @@ public class TestSQLServerReader {
      */
     @Test
     public void testRead() throws Exception {
-        final String v11 = "11";
-        final String v12 = "12";
-        final String v21 = "aa";
-        final String v22 = "bb";
-
-        final String msg1 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v11, v12);
-        final String msg2 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v21, v22);
-
-        when(resultSet.next()).thenReturn(true, true, false);
-        when(resultSet.getString(1)).thenReturn(v11, v21);
-        when(resultSet.getString(2)).thenReturn(v12, v22);
-        Message message1 = reader.read();
-        assertEquals(msg1, message1.toString());
-        verify(preparedStatement, times(1)).setFetchSize(SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE);
-        Message message2 = reader.read();
-        assertEquals(msg2, message2.toString());
-        assertEquals(2L, atomicLong.get());
+        final String right = "value";
+        final String left = "key";
+        final String dataKey = "dataKey";
+        when(sqlServerMessageQueue.isEmpty()).thenReturn(true);
+        assertEquals(null, reader.read());
+        when(sqlServerMessageQueue.isEmpty()).thenReturn(false);
+        when(sqlServerMessageQueue.poll()).thenReturn(Pair.of(left, right));
+        Message result = reader.read();
+        assertEquals(String.join(right, "\"", "\""), result.toString());
+        assertEquals(left, result.getHeader().get(dataKey));
     }
 
     /**
@@ -178,12 +190,11 @@ public class TestSQLServerReader {
      */
     @Test
     public void testDestroy() throws Exception {
-        assertFalse(reader.isFinished());
+        assertFalse(reader.isDestroyed());
         reader.destroy();
-        verify(resultSet).close();
-        verify(preparedStatement).close();
-        verify(conn).close();
-        assertTrue(reader.isFinished());
+        verify(executorService).shutdownNow();
+        verify(sqlServerSnapshot).close();
+        assertTrue(reader.isDestroyed());
     }
 
     /**
@@ -192,10 +203,7 @@ public class TestSQLServerReader {
     @Test
     public void testFinishRead() throws Exception {
         assertFalse(reader.isFinished());
-        reader.destroy();
-        verify(resultSet).close();
-        verify(preparedStatement).close();
-        verify(conn).close();
+        reader.finishRead();
         assertTrue(reader.isFinished());
     }
 
@@ -212,7 +220,9 @@ public class TestSQLServerReader {
      */
     @Test
     public void testGetSnapshot() {
-        assertEquals(StringUtils.EMPTY, reader.getSnapshot());
+        final String snapShort = "snapShort";
+        when(sqlServerSnapshot.getSnapshot()).thenReturn(snapShort);
+        assertEquals(snapShort, reader.getSnapshot());
     }
 
     /**
@@ -220,6 +230,6 @@ public class TestSQLServerReader {
      */
     @Test
     public void testGetReadSource() {
-        assertEquals(sql, reader.getReadSource());
+        assertEquals(instanceId, reader.getReadSource());
     }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 97646ccbf..90b71df9a 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -17,9 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.common.metric.MetricItem;
@@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 import static org.powermock.api.support.membermodification.MemberMatcher.field;
@@ -84,20 +81,10 @@ public class TestSQLServerSource {
      */
     @Test
     public void testSplit() {
-        final String sql1 = "select * from dbo.test01";
-        final String sql2 = "select * from dbo.test${01,99}";
 
         // build mock
-        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn("test_group");
-        when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn("test_stream");
-        when(jobProfile.get(eq(SQLServerSource.JOB_DATABASE_SQL), eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
-                sql1, sql2);
-
         final SQLServerSource source = new SQLServerSource();
-
         // assert
-        assertEquals(null, source.split(jobProfile));
         assertEquals(1, source.split(jobProfile).size());
-        assertEquals(99, source.split(jobProfile).size());
     }
 }
diff --git a/pom.xml b/pom.xml
index eccb11a27..08785dc82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -606,6 +606,12 @@
                 <version>${debezium.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-sqlserver</artifactId>
+                <version>${debezium.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.debezium</groupId>
                 <artifactId>debezium-connector-mongodb</artifactId>