You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/02 05:31:32 UTC

[incubator-inlong] branch master updated: [INLONG-4358][Manager] Support SqlServer source (#4373)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e701c58 [INLONG-4358][Manager] Support SqlServer source (#4373)
f5e701c58 is described below

commit f5e701c5884efd1daee7aa68a20794294eb59b01
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Thu Jun 2 13:31:27 2022 +0800

    [INLONG-4358][Manager] Support SqlServer source (#4373)
    
    * [INLONG-4358][Manager] Support SqlServer source
    
    * [INLONG-4358][Manager] Fix code comments
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../apache/inlong/common/enums/TaskTypeEnum.java   |   3 +
 .../inlong/manager/common/enums/SourceType.java    |   4 +-
 .../pojo/source/sqlserver/SqlServerSource.java     |  84 +++++++++++++++++
 .../pojo/source/sqlserver/SqlServerSourceDTO.java  | 103 +++++++++++++++++++++
 .../sqlserver/SqlServerSourceListResponse.java     |  75 +++++++++++++++
 .../source/sqlserver/SqlServerSourceRequest.java   |  73 +++++++++++++++
 .../service/sort/util/ExtractNodeUtils.java        |  77 +++++++++++----
 .../source/sqlserver/SqlServerSourceOperation.java | 101 ++++++++++++++++++++
 .../core/source/SqlServerSourceServiceTest.java    | 101 ++++++++++++++++++++
 9 files changed, 604 insertions(+), 17 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 594ec3d17..0cb210be9 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -27,6 +27,7 @@ public enum TaskTypeEnum {
     PULSAR(5),
     POSTGRES(6),
     ORACLE(7),
+    SQLSERVER(8),
 
     ;
 
@@ -54,6 +55,8 @@ public enum TaskTypeEnum {
                 return POSTGRES;
             case 7:
                 return ORACLE;
+            case 8:
+                return SQLSERVER;
             default:
                 throw new RuntimeException(String.format("Unsupported taskType=%s", taskType));
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 9f0fd2997..f5f244aa7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -34,7 +34,8 @@ public enum SourceType {
     KAFKA("KAFKA", TaskTypeEnum.KAFKA),
     PULSAR("PULSAR", TaskTypeEnum.PULSAR),
     POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES),
-    ORACLE("ORACLE", TaskTypeEnum.ORACLE);
+    ORACLE("ORACLE", TaskTypeEnum.ORACLE),
+    SQLSERVER("SQLSERVER", TaskTypeEnum.SQLSERVER);
 
     public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
     public static final String SOURCE_FILE = "FILE";
@@ -44,6 +45,7 @@ public enum SourceType {
     public static final String SOURCE_PULSAR = "PULSAR";
     public static final String SOURCE_POSTGRES = "POSTGRES";
     public static final String SOURCE_ORACLE = "ORACLE";
+    public static final String SOURCE_SQLSERVER = "SQLSERVER";
 
     @Getter
     private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSource.java
new file mode 100644
index 000000000..86fafc5d6
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSource.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.sqlserver;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * SqlServer source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "SqlServer source info")
+@JsonTypeDefine(value = SourceType.SOURCE_SQLSERVER)
+public class SqlServerSource extends StreamSource {
+
+    @ApiModelProperty("Username of the SqlServer")
+    private String username;
+
+    @ApiModelProperty("Password of the SqlServer")
+    private String password;
+
+    @ApiModelProperty("Hostname of the SqlServer")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the SqlServer")
+    private int port;
+
+    @ApiModelProperty("Database of the SqlServer")
+    private String database;
+
+    @ApiModelProperty("Schema name of the SqlServer")
+    private String schemaName;
+
+    @ApiModelProperty("Table name of the SqlServer")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+    public SqlServerSource() {
+        this.setSourceType(SourceType.SQLSERVER.name());
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, SqlServerSourceRequest::new);
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceDTO.java
new file mode 100644
index 000000000..653fe5543
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceDTO.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.sqlserver;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sqlserver source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SqlServerSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("Username of the SqlServer")
+    private String username;
+
+    @ApiModelProperty("Password of the SqlServer")
+    private String password;
+
+    @ApiModelProperty("Hostname of the SqlServer")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the SqlServer")
+    private int port;
+
+    @ApiModelProperty("Database of the SqlServer")
+    private String database;
+
+    @ApiModelProperty("Schema name of the SqlServer")
+    private String schemaName;
+
+    @ApiModelProperty("Table name of the SqlServer")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static SqlServerSourceDTO getFromRequest(SqlServerSourceRequest request) {
+        return SqlServerSourceDTO.builder()
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .hostname(request.getHostname())
+                .port(request.getPort())
+                .database(request.getDatabase())
+                .schemaName(request.getSchemaName())
+                .tableName(request.getTableName())
+                .serverTimezone(request.getServerTimezone())
+                .allMigration(request.isAllMigration())
+                .primaryKey(request.getPrimaryKey())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string
+     */
+    public static SqlServerSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, SqlServerSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceListResponse.java
new file mode 100644
index 000000000..583b0d576
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceListResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.sqlserver;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Response of SqlServer source paging list
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of SqlServer source paging list")
+@JsonTypeDefine(value = SourceType.SOURCE_SQLSERVER)
+public class SqlServerSourceListResponse extends SourceListResponse {
+
+    @ApiModelProperty("Username of the SqlServer")
+    private String username;
+
+    @ApiModelProperty("Password of the SqlServer")
+    private String password;
+
+    @ApiModelProperty("Hostname of the SqlServer")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the SqlServer")
+    private int port;
+
+    @ApiModelProperty("Database of the SqlServer")
+    private String database;
+
+    @ApiModelProperty("Schema name of the SqlServer")
+    private String schemaName;
+
+    @ApiModelProperty("Table name of the SqlServer")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+    public SqlServerSourceListResponse() {
+        this.setSourceType(SourceType.SQLSERVER.getType());
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceRequest.java
new file mode 100644
index 000000000..351be9386
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SqlServerSourceRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.sqlserver;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request info of the SqlServer source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the SqlServer source")
+@JsonTypeDefine(value = SourceType.SOURCE_SQLSERVER)
+public class SqlServerSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("Username of the SqlServer")
+    private String username;
+
+    @ApiModelProperty("Password of the SqlServer")
+    private String password;
+
+    @ApiModelProperty("Hostname of the SqlServer")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the SqlServer")
+    private int port;
+
+    @ApiModelProperty("Database of the SqlServer")
+    private String database;
+
+    @ApiModelProperty("Schema name of the SqlServer")
+    private String schemaName;
+
+    @ApiModelProperty("Table name of the SqlServer")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+    public SqlServerSourceRequest() {
+        this.setSourceType(SourceType.SQLSERVER.toString());
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 9c2ddf00a..c31b45991 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
 import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSource;
 import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSource;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
@@ -43,6 +44,7 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -62,7 +64,7 @@ import java.util.stream.Collectors;
 public class ExtractNodeUtils {
 
     /**
-     * Create extract nodes from the source responses.
+     * Create extract nodes from the given sources.
      */
     public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
         if (CollectionUtils.isEmpty(sourceInfos)) {
@@ -85,6 +87,8 @@ public class ExtractNodeUtils {
                 return createExtractNode((PostgresSource) sourceInfo);
             case ORACLE:
                 return createExtractNode((OracleSource) sourceInfo);
+            case SQLSERVER:
+                return createExtractNode((SqlServerSource) sourceInfo);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -92,9 +96,9 @@ public class ExtractNodeUtils {
     }
 
     /**
-     * Create MySqlExtractNode based on MySQLBinlogSource
+     * Create MySql extract node
      *
-     * @param binlogSource binlog source info
+     * @param binlogSource MySql binlog source info
      * @return MySql extract node info
      */
     public static MySqlExtractNode createExtractNode(MySQLBinlogSource binlogSource) {
@@ -149,10 +153,10 @@ public class ExtractNodeUtils {
     }
 
     /**
-     * Create KafkaExtractNode based KafkaSource
+     * Create Kafka extract node
      *
-     * @param kafkaSource kafka source response
-     * @return kafka extract node info
+     * @param kafkaSource Kafka source info
+     * @return Kafka extract node info
      */
     public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
         String id = kafkaSource.getSourceName();
@@ -211,10 +215,10 @@ public class ExtractNodeUtils {
     }
 
     /**
-     * Create PulsarExtractNode based PulsarSource
+     * Create Pulsar extract node
      *
-     * @param pulsarSource pulsar source response
-     * @return pulsar extract node info
+     * @param pulsarSource Pulsar source info
+     * @return Pulsar extract node info
      */
     public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
         String id = pulsarSource.getSourceName();
@@ -270,10 +274,10 @@ public class ExtractNodeUtils {
     }
 
     /**
-     * Create PostgresExtractNode based PostgresSource
+     * Create PostgreSQL extract node
      *
-     * @param postgresSource postgres source response
-     * @return postgres extract node info
+     * @param postgresSource PostgreSQL source info
+     * @return PostgreSQL extract node info
      */
     public static PostgresExtractNode createExtractNode(PostgresSource postgresSource) {
         List<StreamField> streamFields = postgresSource.getFieldList();
@@ -291,9 +295,9 @@ public class ExtractNodeUtils {
     }
 
     /**
-     * Create oracleExtractNode based on OracleSourceResponse
+     * Create Oracle extract node 
      *
-     * @param oracleSource oracle source response info
+     * @param oracleSource Oracle source info
      * @return oracle extract node info
      */
     public static OracleExtractNode createExtractNode(OracleSource oracleSource) {
@@ -327,7 +331,48 @@ public class ExtractNodeUtils {
                 schemaName,
                 tableName,
                 port,
-                scanStartupMode
-                );
+                scanStartupMode);
     }
+
+    /**
+     * Create SqlServer extract node
+     *
+     * @param sqlServerSource SqlServer source info
+     * @return SqlServer extract node info
+     */
+    public static SqlServerExtractNode createExtractNode(SqlServerSource sqlServerSource) {
+        final String id = sqlServerSource.getSourceName();
+        final String name = sqlServerSource.getSourceName();
+        final String database = sqlServerSource.getDatabase();
+        final String primaryKey = sqlServerSource.getPrimaryKey();
+        final String hostName = sqlServerSource.getHostname();
+        final String userName = sqlServerSource.getUsername();
+        final String password = sqlServerSource.getPassword();
+        final Integer port = sqlServerSource.getPort();
+        final String schemaName = sqlServerSource.getSchemaName();
+
+        String tablename = sqlServerSource.getTableName();
+        List<StreamField> streamFields = sqlServerSource.getFieldList();
+        List<FieldInfo> fieldInfos = streamFields.stream()
+                .map(fieldInfo -> FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
+                .collect(Collectors.toList());
+        final String serverTimeZone = sqlServerSource.getServerTimezone();
+
+        Map<String, String> properties = Maps.newHashMap();
+        return new SqlServerExtractNode(id,
+                name,
+                fieldInfos,
+                null,
+                properties,
+                primaryKey,
+                hostName,
+                port,
+                userName,
+                password,
+                database,
+                schemaName,
+                tablename,
+                serverTimeZone);
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SqlServerSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SqlServerSourceOperation.java
new file mode 100644
index 000000000..38017c19a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SqlServerSourceOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.sqlserver;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSource;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSourceRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * SqlServer source operation
+ */
+@Service
+public class SqlServerSourceOperation extends AbstractSourceOperation {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(SourceType sourceType) {
+        return SourceType.SQLSERVER == sourceType;
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.SQLSERVER.getType();
+    }
+
+    @Override
+    protected StreamSource getSource() {
+        return new SqlServerSource();
+    }
+
+    @Override
+    public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, SqlServerSourceListResponse::new));
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+        SqlServerSourceRequest sourceRequest = (SqlServerSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            SqlServerSourceDTO dto = SqlServerSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSourceType();
+        Preconditions.checkTrue(getSourceType().equals(existType),
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+        SqlServerSourceDTO dto = SqlServerSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+        return result;
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
new file mode 100644
index 000000000..898c97361
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.source;
+
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSource;
+import org.apache.inlong.manager.common.pojo.source.sqlserver.SqlServerSourceRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * SqlServer source service test.
+ */
+public class SqlServerSourceServiceTest extends ServiceBaseTest {
+
+    private final String sourceName = "stream_source_service_test";
+    private static final String hostname = "127.0.0.1";
+    private static final Integer port = 1521;
+    private static final String database = "sqlserver_database";
+    private static final String schema = "sqlserver_schema";
+    private static final String tablename = "sqlserver_tablename";
+
+    @Autowired
+    private StreamSourceService sourceService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save source info.
+     */
+    public Integer saveSource() {
+        streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
+
+        SqlServerSourceRequest sourceInfo = new SqlServerSourceRequest();
+        sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+        sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
+        sourceInfo.setSourceName(sourceName);
+        sourceInfo.setSourceType(SourceType.SQLSERVER.getType());
+        sourceInfo.setHostname(hostname);
+        sourceInfo.setDatabase(database);
+        sourceInfo.setTableName(tablename);
+        sourceInfo.setPort(port);
+        sourceInfo.setSchemaName(schema);
+        return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveSource();
+        Assert.assertNotNull(id);
+
+        boolean result = sourceService.delete(id, GLOBAL_OPERATOR);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer id = this.saveSource();
+        StreamSource source = sourceService.get(id);
+        Assert.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
+
+        sourceService.delete(id, GLOBAL_OPERATOR);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer id = this.saveSource();
+        StreamSource response = sourceService.get(id);
+        Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
+
+        SqlServerSource sqlserverSource = (SqlServerSource) response;
+
+        SqlServerSourceRequest request = CommonBeanUtils.copyProperties(sqlserverSource, SqlServerSourceRequest::new);
+        boolean result = sourceService.update(request, GLOBAL_OPERATOR);
+        Assert.assertTrue(result);
+
+        sourceService.delete(id, GLOBAL_OPERATOR);
+    }
+
+}