You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/07 07:50:56 UTC

[incubator-inlong] branch master updated: [INLONG-4228][Manager] Adaptive HDFS Load Node (#4343)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bf2e14d [INLONG-4228][Manager] Adaptive HDFS Load Node (#4343)
64bf2e14d is described below

commit 64bf2e14d78286263912cf1772845fcd238182ec
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Tue Jun 7 15:50:50 2022 +0800

    [INLONG-4228][Manager] Adaptive HDFS Load Node (#4343)
---
 .../manager/common/enums/CompressFormat.java       |  53 +++++
 .../inlong/manager/common/enums/SinkType.java      |  16 +-
 .../common/pojo/sink/hdfs/HdfsPartitionField.java  |  41 ++++
 .../manager/common/pojo/sink/hdfs/HdfsSink.java    |  69 ++++++
 .../manager/common/pojo/sink/hdfs/HdfsSinkDTO.java |  93 +++++++++
 .../pojo/sink/hdfs/HdfsSinkListResponse.java       |  63 ++++++
 .../common/pojo/sink/hdfs/HdfsSinkRequest.java     |  59 ++++++
 .../service/sink/hdfs/HdfsSinkOperation.java       | 232 +++++++++++++++++++++
 .../manager/service/sort/util/LoadNodeUtils.java   | 199 +++++++++++-------
 .../core/sink/HdfsStreamSinkServiceTest.java       | 112 ++++++++++
 .../protocol/node/load/FileSystemLoadNode.java     |   1 +
 11 files changed, 860 insertions(+), 78 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/CompressFormat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/CompressFormat.java
new file mode 100644
index 000000000..0b6944409
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/CompressFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import lombok.Getter;
+
+/**
+ * Enum of data compress.
+ */
+public enum CompressFormat {
+
+    NONE("none"),
+    DEFLATE("deflate"),
+    GZIP("gzip"),
+    BZIP2("bzip2"),
+    LZ4("lz4"),
+    SNAPPY("snappy");
+
+    @Getter
+    private final String name;
+
+    CompressFormat(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Get data compress format by name.
+     */
+    public static CompressFormat forName(String name) {
+        for (CompressFormat compressFormat : values()) {
+            if (compressFormat.getName().equalsIgnoreCase(name)) {
+                return compressFormat;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Unsupport CompressionFormat:%s", name));
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index f415ba300..49e15a801 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -21,16 +21,28 @@ import java.util.Locale;
 
 public enum SinkType {
 
-    HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE, POSTGRES, ELASTICSEARCH, SQLSERVER;
+    HIVE,
+    KAFKA,
+    ICEBERG,
+    CLICKHOUSE,
+    HBASE,
+    POSTGRES,
+    ELASTICSEARCH,
+    SQLSERVER,
+    HDFS,
+
+    ;
 
     public static final String SINK_HIVE = "HIVE";
     public static final String SINK_KAFKA = "KAFKA";
-    public static final String SINK_HBASE = "HBASE";
     public static final String SINK_ICEBERG = "ICEBERG";
     public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
+    public static final String SINK_HBASE = "HBASE";
     public static final String SINK_POSTGRES = "POSTGRES";
     public static final String SINK_ELASTICSEARCH = "ELASTICSEARCH";
     public static final String SINK_SQLSERVER = "SQLSERVER";
+    public static final String SINK_HDFS = "HDFS";
+
     /**
      * Get the SinkType enum via the given sinkType string
      */
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsPartitionField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsPartitionField.java
new file mode 100644
index 000000000..d72a7afdf
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsPartitionField.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.hdfs;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+/**
+ * Hdfs partition field info
+ */
+@Data
+@ApiModel("Hdfs partition field")
+public class HdfsPartitionField {
+
+    @ApiModelProperty("Field name")
+    private String fieldName;
+
+    @ApiModelProperty("Field type")
+    private String fieldType;
+
+    @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
+    private String fieldFormat;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSink.java
new file mode 100644
index 000000000..35b9d5aea
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.hdfs;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+import java.util.List;
+
+/**
+ * Hdfs sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Hdfs sink info")
+@JsonTypeDefine(value = SinkType.SINK_HDFS)
+public class HdfsSink extends StreamSink {
+
+    @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/usr/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("Data field separator")
+    private String dataSeparator;
+
+    @ApiModelProperty("Compress format")
+    private String compressFormat;
+
+    @ApiModelProperty("Server timeZone")
+    private String serverTimeZone;
+
+    @ApiModelProperty("Partition field list")
+    private List<HdfsPartitionField> partitionFieldList;
+
+    public HdfsSink() {
+        this.setSinkType(SinkType.SINK_HDFS);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, HdfsSinkRequest::new);
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkDTO.java
new file mode 100644
index 000000000..8521e8c70
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkDTO.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.hdfs;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HDFS sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HdfsSinkDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/usr/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("Compress formt")
+    private String compressFormt;
+
+    @ApiModelProperty("Server timeZone")
+    private String serverTimeZone;
+
+    @ApiModelProperty("Data field separator")
+    private String dataSeparator;
+
+    @ApiModelProperty("Partition field list")
+    private List<HdfsPartitionField> partitionFieldList;
+
+    @ApiModelProperty("Properties for hbase")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static HdfsSinkDTO getFromRequest(HdfsSinkRequest request) {
+        return HdfsSinkDTO.builder()
+                .dataPath(request.getDataPath())
+                .dataSeparator(request.getDataSeparator())
+                .fileFormat(request.getFileFormat())
+                .compressFormt(request.getCompressFormt())
+                .serverTimeZone(request.getServerTimeZone())
+                .partitionFieldList(request.getPartitionFieldList())
+                .properties(request.getProperties())
+                .build();
+    }
+
+    /**
+     * Get Hdfs sink info from JSON string
+     */
+    public static HdfsSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, HdfsSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkListResponse.java
new file mode 100644
index 000000000..9700ea223
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkListResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.hdfs;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+import java.util.List;
+
+/**
+ * Response of hdfs sink list
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(SinkType.SINK_HDFS)
+@ApiModel("Response of Hdfs sink paging list")
+public class HdfsSinkListResponse extends SinkListResponse {
+
+    @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/usr/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("Data field separator")
+    private String dataSeparator;
+
+    @ApiModelProperty("Compress formt")
+    private String compressFormt;
+
+    @ApiModelProperty("Server timeZone")
+    private String serverTimeZone;
+
+    @ApiModelProperty("Partition field list")
+    private List<HdfsPartitionField> partitionFieldList;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkRequest.java
new file mode 100644
index 000000000..9f87ae9c1
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HdfsSinkRequest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.hdfs;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+import java.util.List;
+
+/**
+ * Request of the HDFS sink
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Kafka sink info")
+@JsonTypeDefine(value = SinkType.SINK_HDFS)
+public class HdfsSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Data path, such as: hdfs://ip:port/usr/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("Compress formt")
+    private String compressFormt;
+
+    @ApiModelProperty("Server timeZone")
+    private String serverTimeZone;
+
+    @ApiModelProperty("Data field separator")
+    private String dataSeparator;
+
+    @ApiModelProperty("Partition field list")
+    private List<HdfsPartitionField> partitionFieldList;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HdfsSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HdfsSinkOperation.java
new file mode 100644
index 000000000..77e23f0fe
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HdfsSinkOperation.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.hdfs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSink;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSinkRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Hdfs sink operation
+ */
+@Service
+public class HdfsSinkOperation implements StreamSinkOperation {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsSinkOperation.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private StreamSinkEntityMapper sinkMapper;
+    @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.HDFS.equals(sinkType);
+    }
+
+    @Override
+    public Integer saveOpt(SinkRequest request, String operator) {
+        String sinkType = request.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_HDFS.equals(sinkType),
+                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
+
+        HdfsSinkRequest hdfsSinkRequest = (HdfsSinkRequest) request;
+        StreamSinkEntity entity = CommonBeanUtils.copyProperties(hdfsSinkRequest, StreamSinkEntity::new);
+        entity.setStatus(SinkStatus.NEW.getCode());
+        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
+
+        // get the ext params
+        HdfsSinkDTO dto = HdfsSinkDTO.getFromRequest(hdfsSinkRequest);
+        try {
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
+        }
+        sinkMapper.insert(entity);
+        Integer sinkId = entity.getId();
+        request.setId(sinkId);
+        this.saveFieldOpt(request);
+        return sinkId;
+    }
+
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getFieldList();
+        LOGGER.info("begin to save hdfs field={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        java.util.List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save hdfs field");
+    }
+
+    @Override
+    public StreamSink getByEntity(@NotNull StreamSinkEntity entity) {
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_HDFS.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HDFS, existType));
+        StreamSink response = this.getFromEntity(entity, HdfsSink::new);
+        List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(entity.getId());
+        List<SinkField> infos = CommonBeanUtils.copyListProperties(entities, SinkField::new);
+        response.setFieldList(infos);
+        return response;
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_HDFS.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HDFS, existType));
+
+        HdfsSinkDTO dto = HdfsSinkDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+
+        return result;
+    }
+
+    @Override
+    public PageInfo<? extends SinkListResponse> getPageInfo(Page<StreamSinkEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, HdfsSinkListResponse::new));
+    }
+
+    @Override
+    public void updateOpt(SinkRequest request, String operator) {
+        String sinkType = request.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_HDFS.equals(sinkType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HDFS, sinkType));
+
+        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        HdfsSinkRequest hdfsRequest = (HdfsSinkRequest) request;
+        CommonBeanUtils.copyProperties(hdfsRequest, entity, true);
+        try {
+            HdfsSinkDTO dto = HdfsSinkDTO.getFromRequest(hdfsRequest);
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+        }
+
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+        entity.setModifier(operator);
+        entity.setModifyTime(new Date());
+        sinkMapper.updateByPrimaryKeySelective(entity);
+
+        boolean onlyAdd = SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
+        this.updateFieldOpt(onlyAdd, hdfsRequest);
+        LOGGER.info("success to update sink of type={}", sinkType);
+    }
+
+    @Override
+    public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
+        Integer sinkId = request.getId();
+        List<SinkField> fieldRequestList = request.getFieldList();
+        if (CollectionUtils.isEmpty(fieldRequestList)) {
+            return;
+        }
+
+        if (onlyAdd) {
+            List<StreamSinkFieldEntity> existsFieldList = sinkFieldMapper.selectBySinkId(sinkId);
+            if (existsFieldList.size() > fieldRequestList.size()) {
+                throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+            }
+            for (int i = 0; i < existsFieldList.size(); i++) {
+                if (!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName())) {
+                    throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+                }
+            }
+        }
+
+        // First physically delete the existing fields
+        sinkFieldMapper.deleteAll(sinkId);
+        // Then batch save the sink fields
+        this.saveFieldOpt(request);
+
+        LOGGER.info("success to update field");
+    }
+}
+
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 8b9ae2304..6ce8cab49 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.common.pojo.sink.es.ElasticsearchSink;
 import org.apache.inlong.manager.common.pojo.sink.hbase.HBaseSink;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSink;
 import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSink;
@@ -46,6 +47,7 @@ import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
@@ -75,7 +77,7 @@ public class LoadNodeUtils {
     }
 
     /**
-     * Create node of data load.
+     * Create load node from the stream sink info.
      */
     public static LoadNode createLoadNode(StreamSink streamSink) {
         SinkType sinkType = SinkType.forType(streamSink.getSinkType());
@@ -96,20 +98,19 @@ public class LoadNodeUtils {
                 return createLoadNode((SqlServerSink) streamSink);
             case ELASTICSEARCH:
                 return createLoadNode((ElasticsearchSink) streamSink);
+            case HDFS:
+                return createLoadNode((HdfsSink) streamSink);
             default:
-                throw new IllegalArgumentException(
-                        String.format("Unsupported sinkType=%s to create loadNode", sinkType));
+                throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
         }
     }
 
     /**
-     * Create node of data load about kafka.
+     * Create load node of Kafka.
      */
     public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink) {
         String id = kafkaSink.getSinkName();
         String name = kafkaSink.getSinkName();
-        String topicName = kafkaSink.getTopicName();
-        String bootstrapServers = kafkaSink.getBootstrapServers();
         List<SinkField> fieldList = kafkaSink.getFieldList();
         List<FieldInfo> fieldInfos = fieldList.stream()
                 .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
@@ -140,33 +141,31 @@ public class LoadNodeUtils {
                 format = new DebeziumJsonFormat();
                 break;
             default:
-                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
+                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
-        String primaryKey = kafkaSink.getPrimaryKey();
-        return new KafkaLoadNode(id,
+
+        return new KafkaLoadNode(
+                id,
                 name,
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
-                topicName,
-                bootstrapServers,
+                kafkaSink.getTopicName(),
+                kafkaSink.getBootstrapServers(),
                 format,
                 sinkParallelism,
                 properties,
-                primaryKey);
+                kafkaSink.getPrimaryKey()
+        );
     }
 
     /**
-     * Create node of data load about hive.
+     * Create load node of Hive.
      */
     public static HiveLoadNode createLoadNode(HiveSink hiveSink) {
         String id = hiveSink.getSinkName();
         String name = hiveSink.getSinkName();
-        String database = hiveSink.getDbName();
-        String tableName = hiveSink.getTableName();
-        String hiveConfDir = hiveSink.getHiveConfDir();
-        String hiveVersion = hiveSink.getHiveVersion();
         List<SinkField> fieldList = hiveSink.getFieldList();
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
@@ -177,9 +176,9 @@ public class LoadNodeUtils {
         List<FieldInfo> partitionFields = Lists.newArrayList();
         if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
             partitionFields = hiveSink.getPartitionFieldList().stream()
-                    .map(hivePartitionField -> new FieldInfo(hivePartitionField.getFieldName(), name,
-                            FieldInfoUtils.convertFieldFormat(hivePartitionField.getFieldType(),
-                                    hivePartitionField.getFieldFormat()))).collect(Collectors.toList());
+                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), name,
+                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+                                    partitionField.getFieldFormat()))).collect(Collectors.toList());
         }
         return new HiveLoadNode(
                 id,
@@ -191,17 +190,17 @@ public class LoadNodeUtils {
                 null,
                 properties,
                 null,
-                database,
-                tableName,
-                hiveConfDir,
-                hiveVersion,
+                hiveSink.getDbName(),
+                hiveSink.getTableName(),
+                hiveSink.getHiveConfDir(),
+                hiveSink.getHiveVersion(),
                 null,
                 partitionFields
         );
     }
 
     /**
-     * Create hbase load node from response.
+     * Create load node of HBase.
      */
     public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink) {
         String id = hbaseSink.getSinkName();
@@ -234,7 +233,7 @@ public class LoadNodeUtils {
     }
 
     /**
-     * Create postgres load node
+     * Create load node of PostgreSQL.
      */
     public static PostgresLoadNode createLoadNode(PostgresSink postgresSink) {
         List<SinkField> fieldList = postgresSink.getFieldList();
@@ -243,17 +242,26 @@ public class LoadNodeUtils {
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
         List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        return new PostgresLoadNode(postgresSink.getSinkName(),
-                postgresSink.getSinkName(),
-                fields, fieldRelations, null, null, 1,
-                null, postgresSink.getJdbcUrl(), postgresSink.getUsername(),
+
+        return new PostgresLoadNode(
+                name,
+                name,
+                fields,
+                fieldRelations,
+                null,
+                null,
+                1,
+                null,
+                postgresSink.getJdbcUrl(),
+                postgresSink.getUsername(),
                 postgresSink.getPassword(),
                 postgresSink.getDbName() + "." + postgresSink.getTableName(),
-                postgresSink.getPrimaryKey());
+                postgresSink.getPrimaryKey()
+        );
     }
 
     /**
-     * Create ClickHouse load node
+     * Create load node of ClickHouse.
      */
     public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink) {
         List<SinkField> sinkFields = ckSink.getFieldList();
@@ -262,27 +270,30 @@ public class LoadNodeUtils {
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
         List<FieldRelation> fieldRelations = parseSinkFields(sinkFields, name);
-        return new ClickHouseLoadNode(name, name,
-                fields, fieldRelations, null, null, 1,
-                null, ckSink.getTableName(),
+
+        return new ClickHouseLoadNode(
+                name,
+                name,
+                fields,
+                fieldRelations,
+                null,
+                null,
+                1,
+                null,
+                ckSink.getTableName(),
                 ckSink.getJdbcUrl(),
                 ckSink.getUsername(),
-                ckSink.getPassword());
+                ckSink.getPassword()
+        );
     }
 
     /**
-     * Create iceberg load node
+     * Create load node of Iceberg.
      */
     public static IcebergLoadNode createLoadNode(IcebergSink icebergSink) {
         String id = icebergSink.getSinkName();
         String name = icebergSink.getSinkName();
-        String dbName = icebergSink.getDbName();
-        String tableName = icebergSink.getTableName();
-        String uri = icebergSink.getCatalogUri();
-        String warehouse = icebergSink.getWarehouse();
-        String primaryKey = icebergSink.getPrimaryKey();
         CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
-
         List<SinkField> sinkFields = icebergSink.getFieldList();
         List<FieldInfo> fields = sinkFields.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
@@ -290,26 +301,31 @@ public class LoadNodeUtils {
         List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
         Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+
         return new IcebergLoadNode(
-                id, name, fields, fieldRelationShips, null, null, 1, properties,
-                dbName, tableName, primaryKey, catalogType, uri, warehouse);
+                id,
+                name,
+                fields,
+                fieldRelationShips,
+                null,
+                null,
+                1,
+                properties,
+                icebergSink.getDbName(),
+                icebergSink.getTableName(),
+                icebergSink.getPrimaryKey(),
+                catalogType,
+                icebergSink.getCatalogUri(),
+                icebergSink.getWarehouse()
+        );
     }
 
     /**
-     * Create SqlServer load node based on SqlServerSink
-     *
-     * @param sqlServerSink SqlServer sink info
-     * @return SqlServer load node info
+     * Create load node of SqlServer.
      */
     public static SqlServerLoadNode createLoadNode(SqlServerSink sqlServerSink) {
         final String id = sqlServerSink.getSinkName();
         final String name = sqlServerSink.getSinkName();
-        final String primaryKey = sqlServerSink.getPrimaryKey();
-        final String jdbcUrl = sqlServerSink.getJdbcUrl();
-        final String userName = sqlServerSink.getUsername();
-        final String password = sqlServerSink.getPassword();
-        final String schemaName = sqlServerSink.getSchemaName();
-        final String tablename = sqlServerSink.getTableName();
         final List<SinkField> fieldList = sqlServerSink.getFieldList();
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
@@ -317,6 +333,7 @@ public class LoadNodeUtils {
         List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
         Map<String, String> properties = sqlServerSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+
         return new SqlServerLoadNode(
                 id,
                 name,
@@ -326,12 +343,12 @@ public class LoadNodeUtils {
                 null,
                 null,
                 properties,
-                jdbcUrl,
-                userName,
-                password,
-                schemaName,
-                tablename,
-                primaryKey
+                sqlServerSink.getJdbcUrl(),
+                sqlServerSink.getUsername(),
+                sqlServerSink.getPassword(),
+                sqlServerSink.getSchemaName(),
+                sqlServerSink.getTableName(),
+                sqlServerSink.getPrimaryKey()
         );
     }
 
@@ -341,14 +358,6 @@ public class LoadNodeUtils {
     public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink) {
         final String id = elasticsearchSink.getSinkName();
         final String name = elasticsearchSink.getSinkName();
-        final String userName = elasticsearchSink.getUsername();
-        final String password = elasticsearchSink.getPassword();
-        final String index = elasticsearchSink.getIndexName();
-        final String host = elasticsearchSink.getHost();
-        final String documentType = elasticsearchSink.getDocumentType();
-        final String promaryKey = elasticsearchSink.getPrimaryKey();
-        final int version =  elasticsearchSink.getVersion().intValue();
-
         final List<SinkField> fieldList = elasticsearchSink.getFieldList();
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
@@ -356,6 +365,7 @@ public class LoadNodeUtils {
         List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
         Map<String, String> properties = elasticsearchSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+
         return new ElasticsearchLoadNode(
                 id,
                 name,
@@ -365,13 +375,50 @@ public class LoadNodeUtils {
                 null,
                 null,
                 properties,
-                index,
-                host,
-                userName,
-                password,
-                documentType,
-                promaryKey,
-                version
+                elasticsearchSink.getIndexName(),
+                elasticsearchSink.getHost(),
+                elasticsearchSink.getUsername(),
+                elasticsearchSink.getPassword(),
+                elasticsearchSink.getDocumentType(),
+                elasticsearchSink.getPrimaryKey(),
+                elasticsearchSink.getVersion()
+        );
+    }
+
+    /**
+     * Create load node of HDFS.
+     */
+    public static FileSystemLoadNode createLoadNode(HdfsSink hdfsSink) {
+        String id = hdfsSink.getSinkName();
+        String name = hdfsSink.getSinkName();
+        List<SinkField> fieldList = hdfsSink.getFieldList();
+        List<FieldInfo> fields = fieldList.stream()
+                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
+                .collect(Collectors.toList());
+        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
+        Map<String, String> properties = hdfsSink.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        List<FieldInfo> partitionFields = Lists.newArrayList();
+        if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
+            partitionFields = hdfsSink.getPartitionFieldList().stream()
+                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), name,
+                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+                                    partitionField.getFieldFormat())))
+                    .collect(Collectors.toList());
+        }
+
+        return new FileSystemLoadNode(
+                id,
+                name,
+                fields,
+                fieldRelations,
+                Lists.newArrayList(),
+                hdfsSink.getDataPath(),
+                hdfsSink.getFileFormat(),
+                null,
+                properties,
+                partitionFields,
+                hdfsSink.getServerTimeZone()
         );
     }
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
new file mode 100644
index 000000000..26413ea0f
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.sink;
+
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSink;
+import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSinkRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HdfsStreamSinkServiceTest extends ServiceBaseTest {
+
+    private static final String globalGroupId = "b_group_hdfs";
+    private static final String globalStreamId = "stream1_hdfs";
+    private static final String globalOperator = "admin";
+    private static final String fileFormat = "TextFile";
+    private static final String dataPath = "hdfs://ip:port/usr/hive/warehouse/test.db";
+    private static final String serverTimeZone = "GMT%2b8";
+    private static final String fieldName = "hdfs_field";
+    private static final String fieldType = "hdfs_type";
+    private static final Integer fieldId = 1;
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save sink info.
+     */
+    public Integer saveSink(String sinkName) {
+        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, globalOperator);
+
+        HdfsSinkRequest hdfsSinkRequest = new HdfsSinkRequest();
+        hdfsSinkRequest.setInlongGroupId(globalGroupId);
+        hdfsSinkRequest.setInlongStreamId(globalStreamId);
+        hdfsSinkRequest.setSinkType(SinkType.SINK_HDFS);
+        hdfsSinkRequest.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        hdfsSinkRequest.setSinkName(sinkName);
+        hdfsSinkRequest.setFileFormat(fileFormat);
+        hdfsSinkRequest.setDataPath(dataPath);
+        hdfsSinkRequest.setServerTimeZone(serverTimeZone);
+        SinkField sinkField = new SinkField();
+        sinkField.setFieldName(fieldName);
+        sinkField.setFieldType(fieldType);
+        sinkField.setId(fieldId);
+        List<SinkField> sinkFieldList = new ArrayList<>();
+        sinkFieldList.add(sinkField);
+        hdfsSinkRequest.setFieldList(sinkFieldList);
+        return sinkService.save(hdfsSinkRequest, globalOperator);
+    }
+
+    /**
+     * Delete Hdfs sink info by sink id.
+     */
+    public void deleteHdfsSink(Integer hdfsSinkId) {
+        boolean result = sinkService.delete(hdfsSinkId, globalOperator);
+        // Verify that the deletion was successful
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer hdfsSinkId = this.saveSink("default_hdfs");
+        StreamSink sink = sinkService.get(hdfsSinkId);
+        // verify globalGroupId
+        Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
+        deleteHdfsSink(hdfsSinkId);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer hdfsSinkId = this.saveSink("default_hdfs");
+        StreamSink response = sinkService.get(hdfsSinkId);
+        Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+        HdfsSink hdfsSink = (HdfsSink) response;
+        hdfsSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+
+        HdfsSinkRequest request = CommonBeanUtils.copyProperties(hdfsSink, HdfsSinkRequest::new);
+        boolean result = sinkService.update(request, globalOperator);
+        Assert.assertTrue(result);
+        deleteHdfsSink(hdfsSinkId);
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
index 8839cceb3..31224ab1f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -61,6 +61,7 @@ public class FileSystemLoadNode extends LoadNode implements Serializable {
     @JsonProperty("partitionFields")
     private List<FieldInfo> partitionFields;
 
+    @JsonProperty("tempTableName")
     private String tempTableName;
 
     @JsonProperty("serverTimeZone")