You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/30 03:08:40 UTC

[incubator-inlong] branch master updated: [INLONG-3428][Manager] Set the default value and refactor some methods (#3447)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f970db  [INLONG-3428][Manager] Set the default value and refactor some methods (#3447)
2f970db is described below

commit 2f970db82117331e534decb504c972d164e3582c
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 30 11:08:33 2022 +0800

    [INLONG-3428][Manager] Set the default value and refactor some methods (#3447)
---
 .../apache/inlong/common/enums/DataTypeEnum.java   |   2 +-
 .../inlong/common/pojo/agent/TaskResult.java       |   4 +
 .../inlong/manager/client/api/StreamField.java     |   4 +-
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   8 +-
 .../{SinkFieldRequest.java => SinkFieldBase.java}  |  16 +--
 .../manager/common/pojo/sink/SinkFieldRequest.java |  33 +-----
 .../common/pojo/sink/SinkFieldResponse.java        |  34 +-----
 .../common/pojo/sink/hive/HivePartitionField.java  |  11 +-
 .../pojo/source/binlog/BinlogSourceRequest.java    |   2 +-
 .../common/pojo/stream/InlongStreamFieldInfo.java  |   4 +-
 .../dao/mapper/InlongStreamFieldEntityMapper.java  |   2 -
 .../mappers/InlongStreamFieldEntityMapper.xml      |  16 ---
 .../service/sink/hive/HiveStreamSinkOperation.java |  29 +----
 .../source/AbstractStreamSourceOperation.java      |   8 +-
 .../thirdparty/hive/DefaultHiveTableOperator.java  |   3 +-
 .../thirdparty/sort/util/SerializationUtils.java   |   3 +-
 .../thirdparty/sort/util/SinkInfoUtils.java        | 122 ++++++++-------------
 .../main/resources/sql/apache_inlong_manager.sql   |   4 +-
 .../manager-web/sql/apache_inlong_manager.sql      |   4 +-
 19 files changed, 90 insertions(+), 219 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 1916a87..8eeb643 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -28,7 +28,7 @@ public enum DataTypeEnum {
     DEBEZIUM_JSON("debezium_json");
 
     @Getter
-    private String name;
+    private final String name;
 
     DataTypeEnum(String name) {
         this.name = name;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index bb746a2..878a65d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -17,8 +17,10 @@
 
 package org.apache.inlong.common.pojo.agent;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.util.List;
 
@@ -27,6 +29,8 @@ import java.util.List;
  */
 @Data
 @Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class TaskResult {
 
     private List<CmdConfig> cmdConfigs;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index e1c51ca..ba08425 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -48,8 +48,8 @@ public class StreamField {
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
-    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
-            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
     private String fieldFormat;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 8ae3d57..ff2ba5e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -87,11 +87,9 @@ public enum ErrorCodeEnum {
     SINK_TB_NAME_UPDATE_NOT_ALLOWED(1409, "Current status does not allow modification the table name"),
     SINK_FIELD_UPDATE_NOT_ALLOWED(1410, "Current status not allowed to modification/delete field"),
     SINK_FIELD_LIST_IS_EMPTY(1411, "Sink field list is not allow empty"),
-    SINK_PARTITION_FIELD_NAME_IS_EMPTY(1412, "Sink partition field name is not allow empty"),
-    SINK_PARTITION_FIELD_NOT_FOUND_IN_SINK_FIELD_LIST(1413,
-            "Sink partition field is not found in sink field list"),
-    SOURCE_FIELD_NAME_OF_SINK_PARTITION_FIELD_IS_EMPTY(1414,
-            "Sink field's source field name is not allowed empty when the field is a partition field"),
+    PARTITION_FIELD_NAME_IS_EMPTY(1412, "Partition field name cannot be empty"),
+    PARTITION_FIELD_NOT_FOUND(1413, "Sink partition field [%s] not found in sink field list"),
+    PARTITION_FIELD_NO_SOURCE_FIELD(1414, "Sink partition field [%s] must have a related source field name"),
 
 
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
similarity index 74%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
index d4a7073..a737e40 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
@@ -22,11 +22,11 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
 /**
- * Sink field info
+ * Sink field base info
  */
 @Data
-@ApiModel("Sink field info")
-public class SinkFieldRequest {
+@ApiModel("Sink field base info")
+public class SinkFieldBase {
 
     @ApiModelProperty("Field name")
     private String fieldName;
@@ -37,20 +37,14 @@ public class SinkFieldRequest {
     @ApiModelProperty("Field comment")
     private String fieldComment;
 
-    @ApiModelProperty("Required or not, 0: no need, 1: required")
-    private Integer isRequired;
-
     @ApiModelProperty("Source field name")
     private String sourceFieldName;
 
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
-    @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
-    private Integer isMetaField = 0;
-
-    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
-            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
     private String fieldFormat;
 
     @ApiModelProperty("Field order")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index d4a7073..a0a5119 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -20,40 +20,17 @@ package org.apache.inlong.manager.common.pojo.sink;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 /**
- * Sink field info
+ * Sink field request.
  */
 @Data
-@ApiModel("Sink field info")
-public class SinkFieldRequest {
-
-    @ApiModelProperty("Field name")
-    private String fieldName;
-
-    @ApiModelProperty("Field type")
-    private String fieldType;
-
-    @ApiModelProperty("Field comment")
-    private String fieldComment;
-
-    @ApiModelProperty("Required or not, 0: no need, 1: required")
-    private Integer isRequired;
-
-    @ApiModelProperty("Source field name")
-    private String sourceFieldName;
-
-    @ApiModelProperty("Source field type")
-    private String sourceFieldType;
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Sink field request")
+public class SinkFieldRequest extends SinkFieldBase {
 
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
-    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
-            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
-    private String fieldFormat;
-
-    @ApiModelProperty("Field order")
-    private Short rankNum;
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index cfabcfc..7ad5eec 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -20,45 +20,19 @@ package org.apache.inlong.manager.common.pojo.sink;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 /**
  * Sink field response
  */
 @Data
+@EqualsAndHashCode(callSuper = true)
 @ApiModel("Sink field response")
-public class SinkFieldResponse {
+public class SinkFieldResponse extends SinkFieldBase {
 
     private Integer id;
 
-    @ApiModelProperty("Sink ID")
-    private Integer sinkId;
-
-    @ApiModelProperty("Field name")
-    private String fieldName;
-
-    @ApiModelProperty("Field type")
-    private String fieldType;
-
-    @ApiModelProperty("Field comment")
-    private String fieldComment;
-
-    @ApiModelProperty("Required or not, 0: no need, 1: required")
-    private Integer isRequired;
-
-    @ApiModelProperty("Source field name")
-    private String sourceFieldName;
-
-    @ApiModelProperty("Source field type")
-    private String sourceFieldType;
-
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
-    private Integer isMetaField = 0;
-
-    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
-            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
-    private String fieldFormat;
-
-    @ApiModelProperty("Field order")
-    private Short rankNum;
+    private Integer isMetaField;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java
index 678bd77..d774af2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
 /**
- * Sink partition field info
+ * Hive partition field info
  */
 @Data
 @ApiModel("Hive partition field")
@@ -34,13 +34,8 @@ public class HivePartitionField {
     @ApiModelProperty("Field type")
     private String fieldType;
 
-    @ApiModelProperty("Field comment")
-    private String fieldComment;
-
-    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
-            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
     private String fieldFormat;
 
-    @ApiModelProperty("Field order")
-    private Short rankNum = 0;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index cdb1e2f..4af7441 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -84,7 +84,7 @@ public class BinlogSourceRequest extends SourceRequest {
      * generally not used.
      */
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
-    private String snapshotMode;
+    private String snapshotMode = "initial";
 
     @ApiModelProperty("The file path to store offset info")
     private String offsetFilename;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
index d220267..d991e0b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
@@ -57,8 +57,8 @@ public class InlongStreamFieldInfo {
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
-    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
-            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
     private String fieldFormat;
 
     @ApiModelProperty(value = "field rank num")
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
index fb20c11..c598c31 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
@@ -38,8 +38,6 @@ public interface InlongStreamFieldEntityMapper {
     List<InlongStreamFieldEntity> selectFields(@Param("groupId") String groupId,
             @Param("streamId") String streamId);
 
-    int updateByPrimaryKey(InlongStreamFieldEntity record);
-
     int deleteByPrimaryKey(Integer id);
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index ab354cb..1a7d585 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -110,22 +110,6 @@
           and field.is_predefined_field = 0
     </select>
 
-    <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
-        update inlong_stream_field
-        set inlong_group_id     = #{inlongGroupId,jdbcType=VARCHAR},
-            inlong_stream_id    = #{inlongStreamId,jdbcType=VARCHAR},
-            is_predefined_field = #{isPredefinedField,jdbcType=INTEGER},
-            field_name          = #{fieldName,jdbcType=VARCHAR},
-            field_value         = #{fieldValue,jdbcType=VARCHAR},
-            pre_expression      = #{preExpression,jdbcType=VARCHAR},
-            field_type          = #{fieldType,jdbcType=VARCHAR},
-            is_meta_field       = #(isMetaField,jdbcType=SMALLINT),
-            field_format        = #{fieldFormat,jdbcType=VARCHAR},
-            field_comment       = #{fieldComment,jdbcType=VARCHAR},
-            rank_num            = #{rankNum,jdbcType=SMALLINT},
-            is_deleted          = #{isDeleted,jdbcType=INTEGER}
-        where id = #{id,jdbcType=INTEGER}
-    </update>
     <update id="logicDeleteAllByIdentifier">
         update inlong_stream_field
         set is_deleted = 1
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
index d36b59e..8b3e6d8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
-import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkDTO;
@@ -44,6 +43,7 @@ import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -52,9 +52,7 @@ import org.springframework.stereotype.Service;
 import javax.validation.constraints.NotNull;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Supplier;
 
 /**
@@ -83,7 +81,7 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
         Preconditions.checkTrue(Constant.SINK_HIVE.equals(sinkType),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
         HiveSinkRequest hiveRequest = (HiveSinkRequest) request;
-        checkPartitionField(hiveRequest);
+        SinkInfoUtils.checkPartitionField(hiveRequest.getFieldList(), hiveRequest.getPartitionFieldList());
         StreamSinkEntity entity = CommonBeanUtils.copyProperties(hiveRequest, StreamSinkEntity::new);
         entity.setStatus(EntityStatus.SINK_NEW.getCode());
         entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
@@ -108,27 +106,6 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
         return sinkId;
     }
 
-    private void checkPartitionField(HiveSinkRequest hiveRequest) {
-        if (CollectionUtils.isNotEmpty(hiveRequest.getPartitionFieldList())) {
-            Preconditions.checkNotEmpty(hiveRequest.getFieldList(),
-                    String.format("%s:%s",
-                            ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY.getMessage(), hiveRequest.getSinkType()));
-            Map<String, SinkFieldRequest> sinkFieldMap = new HashMap<>(hiveRequest.getFieldList().size());
-            hiveRequest.getFieldList().forEach(s -> sinkFieldMap.put(s.getFieldName(), s));
-            for (HivePartitionField partitionFieldInfo : hiveRequest.getPartitionFieldList()) {
-                Preconditions.checkNotEmpty(partitionFieldInfo.getFieldName(), String.format("%s:%s",
-                        ErrorCodeEnum.SINK_PARTITION_FIELD_NAME_IS_EMPTY.getMessage(), hiveRequest.getSinkType()));
-                SinkFieldRequest sinkFieldRequest = sinkFieldMap.get(partitionFieldInfo.getFieldName());
-                Preconditions.checkNotNull(sinkFieldRequest, String.format("%s:%s",
-                        ErrorCodeEnum.SINK_PARTITION_FIELD_NOT_FOUND_IN_SINK_FIELD_LIST.getMessage(),
-                        hiveRequest.getSinkType()));
-                Preconditions.checkNotEmpty(sinkFieldRequest.getSourceFieldName(), String.format("%s:%s",
-                        ErrorCodeEnum.SOURCE_FIELD_NAME_OF_SINK_PARTITION_FIELD_IS_EMPTY.getMessage(),
-                        hiveRequest.getSinkType()));
-            }
-        }
-    }
-
     @Override
     public void saveFieldOpt(SinkRequest request) {
         List<SinkFieldRequest> fieldList = request.getFieldList();
@@ -212,7 +189,7 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         HiveSinkRequest hiveRequest = (HiveSinkRequest) request;
-        checkPartitionField(hiveRequest);
+        SinkInfoUtils.checkPartitionField(hiveRequest.getFieldList(), hiveRequest.getPartitionFieldList());
         CommonBeanUtils.copyProperties(hiveRequest, entity, true);
         try {
             HiveSinkDTO dto = HiveSinkDTO.getFromRequest(hiveRequest);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index d709b9d..8c1dbbc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -119,7 +119,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         if (!SourceState.ALLOWED_UPDATE.contains(entity.getStatus())) {
-            throw new RuntimeException(String.format("Source=%s is not allowed to update, "
+            throw new BusinessException(String.format("Source=%s is not allowed to update, "
                     + "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
         }
         // Setting updated parameters of stream source entity.
@@ -138,7 +138,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
         SourceState curState = SourceState.forCode(existEntity.getStatus());
         SourceState nextState = SourceState.TO_BE_ISSUED_FROZEN;
         if (!SourceState.isAllowedTransition(curState, nextState)) {
-            throw new RuntimeException(String.format("Source=%s is not allowed to stop", existEntity));
+            throw new BusinessException(String.format("Source=%s is not allowed to stop", existEntity));
         }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
         curEntity.setVersion(existEntity.getVersion() + 1);
@@ -155,7 +155,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
         SourceState curState = SourceState.forCode(existEntity.getStatus());
         SourceState nextState = SourceState.TO_BE_ISSUED_ACTIVE;
         if (!SourceState.isAllowedTransition(curState, nextState)) {
-            throw new RuntimeException(String.format("Source=%s is not allowed to restart", existEntity));
+            throw new BusinessException(String.format("Source=%s is not allowed to restart", existEntity));
         }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
         curEntity.setVersion(existEntity.getVersion() + 1);
@@ -179,7 +179,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
             nextState = SourceState.SOURCE_DISABLE;
         }
         if (!SourceState.isAllowedTransition(curState, nextState)) {
-            throw new RuntimeException(String.format("Source=%s is not allowed to delete", existEntity));
+            throw new BusinessException(String.format("Source=%s is not allowed to delete", existEntity));
         }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
         curEntity.setVersion(existEntity.getVersion() + 1);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java
index 02de742..98dcd9d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
+
 import static java.util.stream.Collectors.toList;
 
 /**
@@ -131,7 +131,6 @@ public class DefaultHiveTableOperator implements IHiveTableOperator {
 
         // Set partition fields
         if (CollectionUtils.isNotEmpty(hiveInfo.getPartitionFieldList())) {
-            hiveInfo.getPartitionFieldList().sort(Comparator.comparing(HivePartitionField::getRankNum));
             for (HivePartitionField field : hiveInfo.getPartitionFieldList()) {
                 HiveColumnQueryBean columnBean = new HiveColumnQueryBean();
                 columnBean.setColumnName(field.getFieldName());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index b4d4a52..cea3110 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -135,8 +135,7 @@ public class SerializationUtils {
     /**
      * Get deserialization info for File
      */
-    private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse,
-            InlongStreamInfo streamInfo) {
+    private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse, InlongStreamInfo streamInfo) {
         String serializationType = sourceResponse.getSerializationType();
         DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
         switch (dataType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 8b8001c..13bf46c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -23,14 +23,14 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
 import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
-import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldBase;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
@@ -44,28 +44,16 @@ import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class SinkInfoUtils {
 
-    private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<>();
-
-    private static final Map<String, TimeUnit> PARTITION_TIME_UNIT_MAP = new HashMap<>();
-
-    static {
-        PARTITION_TIME_FORMAT_MAP.put("D", "yyyyMMdd");
-        PARTITION_TIME_FORMAT_MAP.put("H", "yyyyMMddHH");
-        PARTITION_TIME_FORMAT_MAP.put("I", "yyyyMMddHHmm");
-
-        PARTITION_TIME_UNIT_MAP.put("D", TimeUnit.DAYS);
-        PARTITION_TIME_UNIT_MAP.put("H", TimeUnit.HOURS);
-        PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
-    }
+    private static final String DATA_FORMAT = "yyyyMMddHH";
+    private static final String TIME_FORMAT = "HHmmss";
+    private static final String DATA_TIME_FORMAT = "yyyyMMddHHmmss";
 
     /**
      * Create sink info for DataFlowInfo.
@@ -153,59 +141,29 @@ public class SinkInfoUtils {
         } else {
             fileFormat = new HiveSinkInfo.TextFileFormat(separator);
         }
-        // The primary partition field, in Sink must be HiveTimePartitionInfo
-//        List<HivePartitionInfo> partitionList = new ArrayList<>();
-//        String primary = hiveInfo.getPrimaryPartition();
-//        if (StringUtils.isNotEmpty(primary)) {
-//            // Hive partitions are by day, hour, and minute
-//            String unit = hiveInfo.getPartitionUnit();
-//        HiveTimePartitionInfo timePartitionInfo = new HiveTimePartitionInfo(
-//                primary, PARTITION_TIME_FORMAT_MAP.get(unit));
-//            partitionList.add(timePartitionInfo);
-//        }
-        // For the secondary partition field, the sink is temporarily encapsulated as HiveFieldPartitionInfo,
-        // TODO the type be set according to the type of the field itself.
-//        if (StringUtils.isNotEmpty(hiveInfo.getSecondaryPartition())) {
-//            partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
-//        }
 
         // Handle hive partition list
-        List<HivePartitionInfo> partitionList;
-        if (CollectionUtils.isNotEmpty(hiveInfo.getPartitionFieldList())) {
-            checkPartitionField(hiveInfo);
-            hiveInfo.getPartitionFieldList().sort(Comparator.comparing(HivePartitionField::getRankNum));
-            partitionList = hiveInfo.getPartitionFieldList().stream().map(s -> {
+        List<HivePartitionInfo> partitionList = new ArrayList<>();
+        List<HivePartitionField> partitionFieldList = hiveInfo.getPartitionFieldList();
+        if (CollectionUtils.isNotEmpty(partitionFieldList)) {
+            SinkInfoUtils.checkPartitionField(hiveInfo.getFieldList(), partitionFieldList);
+            partitionList = partitionFieldList.stream().map(s -> {
                 HivePartitionInfo partition;
+                String fieldFormat = s.getFieldFormat();
                 switch (FieldType.forName(s.getFieldType())) {
-                    case TIME:
-                        if (StringUtils.isNotBlank(s.getFieldFormat())) {
-                            partition = new HiveTimePartitionInfo(s.getFieldName(), s.getFieldFormat());
-                        } else {
-                            partition = new HiveTimePartitionInfo(s.getFieldName(), "HH:mm:ss");
-                        }
-                        break;
                     case TIMESTAMP:
-                        if (StringUtils.isNotBlank(s.getFieldFormat())) {
-                            partition = new HiveTimePartitionInfo(s.getFieldName(), s.getFieldFormat());
-                        } else {
-                            partition = new HiveTimePartitionInfo(s.getFieldName(),
-                                    "yyyy-MM-dd HH:mm:ss");
-                        }
+                        fieldFormat = StringUtils.isNotBlank(fieldFormat) ? fieldFormat : DATA_TIME_FORMAT;
+                        partition = new HiveTimePartitionInfo(s.getFieldName(), fieldFormat);
                         break;
                     case DATE:
-                        if (StringUtils.isNotBlank(s.getFieldFormat())) {
-                            partition = new HiveTimePartitionInfo(s.getFieldName(), s.getFieldFormat());
-                        } else {
-                            partition = new HiveTimePartitionInfo(s.getFieldName(), "yyyy-MM-dd");
-                        }
+                        fieldFormat = StringUtils.isNotBlank(fieldFormat) ? fieldFormat : DATA_FORMAT;
+                        partition = new HiveTimePartitionInfo(s.getFieldName(), fieldFormat);
                         break;
                     default:
                         partition = new HiveFieldPartitionInfo(s.getFieldName());
                 }
                 return partition;
             }).collect(Collectors.toList());
-        } else {
-            partitionList = new ArrayList<>();
         }
 
         // dataPath = dataPath + / + tableName
@@ -221,23 +179,37 @@ public class SinkInfoUtils {
                 dataPath, partitionList.toArray(new HiveSinkInfo.HivePartitionInfo[0]), fileFormat);
     }
 
-    private static void checkPartitionField(HiveSinkResponse hiveInfo) {
-        if (CollectionUtils.isNotEmpty(hiveInfo.getPartitionFieldList())) {
-            Preconditions.checkNotEmpty(hiveInfo.getFieldList(),
-                    String.format("%s:%s",
-                            ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY.getMessage(), hiveInfo.getSinkType()));
-            Map<String, SinkFieldResponse> sinkFieldMap = new HashMap<>(hiveInfo.getFieldList().size());
-            hiveInfo.getFieldList().forEach(s -> sinkFieldMap.put(s.getFieldName(), s));
-            for (HivePartitionField partitionFieldInfo : hiveInfo.getPartitionFieldList()) {
-                Preconditions.checkNotEmpty(partitionFieldInfo.getFieldName(), String.format("%s:%s",
-                        ErrorCodeEnum.SINK_PARTITION_FIELD_NAME_IS_EMPTY.getMessage(), hiveInfo.getSinkType()));
-                SinkFieldResponse sinkFieldResponse = sinkFieldMap.get(partitionFieldInfo.getFieldName());
-                Preconditions.checkNotNull(sinkFieldResponse, String.format("%s:%s",
-                        ErrorCodeEnum.SINK_PARTITION_FIELD_NOT_FOUND_IN_SINK_FIELD_LIST.getMessage(),
-                        hiveInfo.getSinkType()));
-                Preconditions.checkNotEmpty(sinkFieldResponse.getSourceFieldName(), String.format("%s:%s",
-                        ErrorCodeEnum.SOURCE_FIELD_NAME_OF_SINK_PARTITION_FIELD_IS_EMPTY.getMessage(),
-                        hiveInfo.getSinkType()));
+    /**
+     * Check the validation of Hive partition field.
+     */
+    public static void checkPartitionField(List<? extends SinkFieldBase> fieldList,
+            List<HivePartitionField> partitionList) {
+        if (CollectionUtils.isEmpty(partitionList)) {
+            return;
+        }
+
+        if (CollectionUtils.isEmpty(fieldList)) {
+            throw new BusinessException(ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY);
+        }
+
+        Map<String, SinkFieldBase> sinkFieldMap = new HashMap<>(fieldList.size());
+        fieldList.forEach(field -> sinkFieldMap.put(field.getFieldName(), field));
+
+        for (HivePartitionField partitionField : partitionList) {
+            String fieldName = partitionField.getFieldName();
+            if (StringUtils.isBlank(fieldName)) {
+                throw new BusinessException(ErrorCodeEnum.PARTITION_FIELD_NAME_IS_EMPTY);
+            }
+
+            SinkFieldBase sinkField = sinkFieldMap.get(fieldName);
+            if (sinkField == null) {
+                throw new BusinessException(
+                        String.format(ErrorCodeEnum.PARTITION_FIELD_NOT_FOUND.getMessage(), fieldName));
+            }
+
+            if (StringUtils.isBlank(sinkField.getSourceFieldName())) {
+                throw new BusinessException(
+                        String.format(ErrorCodeEnum.PARTITION_FIELD_NO_SOURCE_FIELD.getMessage(), fieldName));
             }
         }
     }
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 4b986d4..223740c 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -387,7 +387,7 @@ CREATE TABLE `inlong_stream_field`
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format',
+    `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
@@ -585,7 +585,7 @@ CREATE TABLE `stream_sink_field`
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format',
+    `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index b7df0bf..07abf93 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -408,7 +408,7 @@ CREATE TABLE `inlong_stream_field`
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format',
+    `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
@@ -614,7 +614,7 @@ CREATE TABLE `stream_sink_field`
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format',
+    `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)