You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/24 02:46:23 UTC

[inlong] branch master updated: [INLONG-4442][Manager] Support DLCIceberg sink (#4700)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fbf34e3f6 [INLONG-4442][Manager] Support DLCIceberg sink (#4700)
fbf34e3f6 is described below

commit fbf34e3f65df58e9a70860482f2f51fb0b488522
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Fri Jun 24 10:46:17 2022 +0800

    [INLONG-4442][Manager] Support DLCIceberg sink (#4700)
---
 .../inlong/manager/common/enums/SinkType.java      |   2 +
 .../common/pojo/sink/dlc/DLCIcebergSink.java       |  64 ++++++++++
 .../common/pojo/sink/dlc/DLCIcebergSinkDTO.java    |  92 ++++++++++++++
 .../pojo/sink/dlc/DLCIcebergSinkListResponse.java  |  58 +++++++++
 .../pojo/sink/dlc/DLCIcebergSinkRequest.java       |  54 +++++++++
 .../service/sink/dlc/DLCIcebergSinkOperation.java  | 133 +++++++++++++++++++++
 .../manager/service/sort/util/LoadNodeUtils.java   |  35 ++++++
 .../core/sink/DLCIcebergStreamSinkServiceTest.java | 106 ++++++++++++++++
 8 files changed, 544 insertions(+)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index c0143946f..0dc050a38 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -34,6 +34,7 @@ public enum SinkType {
     MYSQL,
     ORACLE,
     TDSQLPOSTGRESQL,
+    DLCICEBERG,
 
     ;
 
@@ -50,6 +51,7 @@ public enum SinkType {
     public static final String SINK_MYSQL = "MYSQL";
     public static final String SINK_ORACLE = "ORACLE";
     public static final String SINK_TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
+    public static final String SINK_DLCICEBERG = "DLCICEBERG";
 
     /**
      * Get the SinkType enum via the given sinkType string
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSink.java
new file mode 100644
index 000000000..ff23e12ab
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.dlc;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * DLCIceberg sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "DLCIceberg sink info")
+@JsonTypeDefine(value = SinkType.SINK_DLCICEBERG)
+public class DLCIcebergSink extends StreamSink {
+
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
+
+    @ApiModelProperty("Data warehouse")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty(value = "Primary key")
+    private String primaryKey;
+
+    public DLCIcebergSink() {
+        this.setSinkType(SinkType.SINK_DLCICEBERG);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, DLCIcebergSinkRequest::new);
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkDTO.java
new file mode 100644
index 000000000..fce693a9e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkDTO.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.dlc;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+import java.util.Map;
+
+/**
+ * DLCIceberg sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DLCIcebergSinkDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final Logger LOGGER = LoggerFactory.getLogger(DLCIcebergSinkDTO.class);
+
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
+
+    @ApiModelProperty("Data warehouse")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty(value = "Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Properties for DLCIceberg")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static DLCIcebergSinkDTO getFromRequest(DLCIcebergSinkRequest request) {
+        return DLCIcebergSinkDTO.builder()
+                .catalogUri(request.getCatalogUri())
+                .warehouse(request.getWarehouse())
+                .dbName(request.getDbName())
+                .tableName(request.getTableName())
+                .primaryKey(request.getPrimaryKey())
+                .properties(request.getProperties())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from json
+     */
+    public static DLCIcebergSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, DLCIcebergSinkDTO.class);
+        } catch (Exception e) {
+            LOGGER.error("fetch DLCIceberg sink info failed from json params: " + extParams, e);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkListResponse.java
new file mode 100644
index 000000000..085144924
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkListResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.dlc;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Response info of DLCIceberg source list
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(SinkType.SINK_DLCICEBERG)
+@ApiModel("Response of DLCIceberg sink paging list")
+public class DLCIcebergSinkListResponse extends SinkListResponse {
+
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
+
+    @ApiModelProperty("Data warehouse")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty(value = "Primary key")
+    private String primaryKey;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkRequest.java
new file mode 100644
index 000000000..253a1f4b0
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlc/DLCIcebergSinkRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.dlc;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the DLCIceberg sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the DLCIceberg sink info")
+@JsonTypeDefine(value = SinkType.SINK_DLCICEBERG)
+public class DLCIcebergSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
+
+    @ApiModelProperty("Data warehouse")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty(value = "Primary key")
+    private String primaryKey;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/dlc/DLCIcebergSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/dlc/DLCIcebergSinkOperation.java
new file mode 100644
index 000000000..df2eddb07
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/dlc/DLCIcebergSinkOperation.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.dlc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSink;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSinkRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * DLCIceberg sink operation, such as save or update DLCIceberg field, etc.
+ */
+@Service
+public class DLCIcebergSinkOperation extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DLCIcebergSinkOperation.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.DLCICEBERG.equals(sinkType);
+    }
+
+    @Override
+    public StreamSink getByEntity(StreamSinkEntity entity) {
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(this.getSinkType().equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), this.getSinkType(), existType));
+
+        StreamSink response = this.getFromEntity(entity, this::getSink);
+        List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(entity.getId());
+        List<SinkField> infos = CommonBeanUtils.copyListProperties(entities,
+                SinkField::new);
+        response.setSinkFieldList(infos);
+
+        return response;
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_DLCICEBERG.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_DLCICEBERG, existType));
+
+        DLCIcebergSinkDTO dto = DLCIcebergSinkDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+
+        return result;
+    }
+
+    @Override
+    public PageInfo<? extends SinkListResponse> getPageInfo(Page<StreamSinkEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, DLCIcebergSinkListResponse::new));
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+        Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
+                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        DLCIcebergSinkRequest dlcIcebergSinkRequest = (DLCIcebergSinkRequest) request;
+
+        try {
+            DLCIcebergSinkDTO dto = DLCIcebergSinkDTO.getFromRequest(dlcIcebergSinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            LOGGER.error("parsing json string to sink info failed", e);
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+        }
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.SINK_DLCICEBERG;
+    }
+
+    @Override
+    protected StreamSink getSink() {
+        return new DLCIcebergSink();
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 0b0df1a32..1a471f101 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSink;
 import org.apache.inlong.manager.common.pojo.sink.es.ElasticsearchSink;
 import org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSink;
 import org.apache.inlong.manager.common.pojo.sink.hbase.HBaseSink;
@@ -50,6 +51,7 @@ import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
@@ -116,6 +118,8 @@ public class LoadNodeUtils {
                 return createLoadNode((OracleSink) streamSink);
             case TDSQLPOSTGRESQL:
                 return createLoadNode((TDSQLPostgreSQLSink) streamSink);
+            case DLCICEBERG:
+                return createLoadNode((DLCIcebergSink) streamSink);
             default:
                 throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
         }
@@ -558,6 +562,37 @@ public class LoadNodeUtils {
                 tdsqlPostgreSQLSink.getPrimaryKey());
     }
 
+    /**
+     * Create load node of DLCIceberg.
+     */
+    public static DLCIcebergLoadNode createLoadNode(DLCIcebergSink dlcIcebergSink) {
+        String id = dlcIcebergSink.getSinkName();
+        String name = dlcIcebergSink.getSinkName();
+        List<SinkField> sinkFields = dlcIcebergSink.getSinkFieldList();
+        List<FieldInfo> fields = sinkFields.stream()
+                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
+                .collect(Collectors.toList());
+        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
+        Map<String, String> properties = dlcIcebergSink.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+
+        return new DLCIcebergLoadNode(
+                id,
+                name,
+                fields,
+                fieldRelationShips,
+                null,
+                null,
+                1,
+                properties,
+                dlcIcebergSink.getDbName(),
+                dlcIcebergSink.getTableName(),
+                dlcIcebergSink.getPrimaryKey(),
+                dlcIcebergSink.getCatalogUri(),
+                dlcIcebergSink.getWarehouse()
+        );
+    }
+
     /**
      * Parse information field of data sink.
      */
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
new file mode 100644
index 000000000..55df215e1
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.sink;
+
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSink;
+import org.apache.inlong.manager.common.pojo.sink.dlc.DLCIcebergSinkRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * DLCIceberg stream sink service test..
+ */
+public class DLCIcebergStreamSinkServiceTest extends ServiceBaseTest {
+
+    private final String globalGroupId = "b_group1";
+    private final String globalStreamId = "stream1_dlciceberg";
+    private final String globalOperator = "admin";
+    private static final String fieldName = "dlc_field";
+    private static final String fieldType = "dlc_type";
+    private static final Integer fieldId = 1;
+    // private final String sinkName = "default";
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save sink info.
+     */
+    public Integer saveSink(String sinkName) {
+        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, globalOperator);
+        DLCIcebergSinkRequest sinkInfo = new DLCIcebergSinkRequest();
+        sinkInfo.setInlongGroupId(globalGroupId);
+        sinkInfo.setInlongStreamId(globalStreamId);
+        sinkInfo.setSinkType(SinkType.SINK_DLCICEBERG);
+        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        SinkField sinkField = new SinkField();
+        sinkField.setFieldName(fieldName);
+        sinkField.setFieldType(fieldType);
+        sinkField.setId(fieldId);
+        List<SinkField> sinkFieldList = new ArrayList<>();
+        sinkFieldList.add(sinkField);
+        sinkInfo.setSinkFieldList(sinkFieldList);
+
+        sinkInfo.setSinkName(sinkName);
+        sinkInfo.setId((int) (Math.random() * 100000 + 1));
+        return sinkService.save(sinkInfo, globalOperator);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveSink("default1");
+        Assertions.assertNotNull(id);
+        boolean result = sinkService.delete(id, globalOperator);
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer id = this.saveSink("default2");
+        StreamSink sink = sinkService.get(id);
+        Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
+        sinkService.delete(id, globalOperator);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer id = this.saveSink("default3");
+        StreamSink response = sinkService.get(id);
+        Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
+
+        DLCIcebergSink dlcIcebergSink = (DLCIcebergSink) response;
+        dlcIcebergSink.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        SinkRequest request = dlcIcebergSink.genSinkRequest();
+        boolean result = sinkService.update(request, globalOperator);
+        Assertions.assertTrue(result);
+    }
+}