You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/30 03:08:40 UTC
[incubator-inlong] branch master updated: [INLONG-3428][Manager] Set the default value and refactor some methods (#3447)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2f970db [INLONG-3428][Manager] Set the default value and refactor some methods (#3447)
2f970db is described below
commit 2f970db82117331e534decb504c972d164e3582c
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 30 11:08:33 2022 +0800
[INLONG-3428][Manager] Set the default value and refactor some methods (#3447)
---
.../apache/inlong/common/enums/DataTypeEnum.java | 2 +-
.../inlong/common/pojo/agent/TaskResult.java | 4 +
.../inlong/manager/client/api/StreamField.java | 4 +-
.../inlong/manager/common/enums/ErrorCodeEnum.java | 8 +-
.../{SinkFieldRequest.java => SinkFieldBase.java} | 16 +--
.../manager/common/pojo/sink/SinkFieldRequest.java | 33 +-----
.../common/pojo/sink/SinkFieldResponse.java | 34 +-----
.../common/pojo/sink/hive/HivePartitionField.java | 11 +-
.../pojo/source/binlog/BinlogSourceRequest.java | 2 +-
.../common/pojo/stream/InlongStreamFieldInfo.java | 4 +-
.../dao/mapper/InlongStreamFieldEntityMapper.java | 2 -
.../mappers/InlongStreamFieldEntityMapper.xml | 16 ---
.../service/sink/hive/HiveStreamSinkOperation.java | 29 +----
.../source/AbstractStreamSourceOperation.java | 8 +-
.../thirdparty/hive/DefaultHiveTableOperator.java | 3 +-
.../thirdparty/sort/util/SerializationUtils.java | 3 +-
.../thirdparty/sort/util/SinkInfoUtils.java | 122 ++++++++-------------
.../main/resources/sql/apache_inlong_manager.sql | 4 +-
.../manager-web/sql/apache_inlong_manager.sql | 4 +-
19 files changed, 90 insertions(+), 219 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 1916a87..8eeb643 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -28,7 +28,7 @@ public enum DataTypeEnum {
DEBEZIUM_JSON("debezium_json");
@Getter
- private String name;
+ private final String name;
DataTypeEnum(String name) {
this.name = name;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index bb746a2..878a65d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -17,8 +17,10 @@
package org.apache.inlong.common.pojo.agent;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.util.List;
@@ -27,6 +29,8 @@ import java.util.List;
*/
@Data
@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class TaskResult {
private List<CmdConfig> cmdConfigs;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index e1c51ca..ba08425 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -48,8 +48,8 @@ public class StreamField {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
- @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
- + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
private String fieldFormat;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 8ae3d57..ff2ba5e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -87,11 +87,9 @@ public enum ErrorCodeEnum {
SINK_TB_NAME_UPDATE_NOT_ALLOWED(1409, "Current status does not allow modification the table name"),
SINK_FIELD_UPDATE_NOT_ALLOWED(1410, "Current status not allowed to modification/delete field"),
SINK_FIELD_LIST_IS_EMPTY(1411, "Sink field list is not allow empty"),
- SINK_PARTITION_FIELD_NAME_IS_EMPTY(1412, "Sink partition field name is not allow empty"),
- SINK_PARTITION_FIELD_NOT_FOUND_IN_SINK_FIELD_LIST(1413,
- "Sink partition field is not found in sink field list"),
- SOURCE_FIELD_NAME_OF_SINK_PARTITION_FIELD_IS_EMPTY(1414,
- "Sink field's source field name is not allowed empty when the field is a partition field"),
+ PARTITION_FIELD_NAME_IS_EMPTY(1412, "Partition field name cannot be empty"),
+ PARTITION_FIELD_NOT_FOUND(1413, "Sink partition field [%s] not found in sink field list"),
+ PARTITION_FIELD_NO_SOURCE_FIELD(1414, "Sink partition field [%s] must have a related source field name"),
WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
similarity index 74%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
index d4a7073..a737e40 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
@@ -22,11 +22,11 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
- * Sink field info
+ * Sink field base info
*/
@Data
-@ApiModel("Sink field info")
-public class SinkFieldRequest {
+@ApiModel("Sink field base info")
+public class SinkFieldBase {
@ApiModelProperty("Field name")
private String fieldName;
@@ -37,20 +37,14 @@ public class SinkFieldRequest {
@ApiModelProperty("Field comment")
private String fieldComment;
- @ApiModelProperty("Required or not, 0: no need, 1: required")
- private Integer isRequired;
-
@ApiModelProperty("Source field name")
private String sourceFieldName;
@ApiModelProperty("Source field type")
private String sourceFieldType;
- @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
- private Integer isMetaField = 0;
-
- @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
- + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
private String fieldFormat;
@ApiModelProperty("Field order")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index d4a7073..a0a5119 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -20,40 +20,17 @@ package org.apache.inlong.manager.common.pojo.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import lombok.EqualsAndHashCode;
/**
- * Sink field info
+ * Sink field request.
*/
@Data
-@ApiModel("Sink field info")
-public class SinkFieldRequest {
-
- @ApiModelProperty("Field name")
- private String fieldName;
-
- @ApiModelProperty("Field type")
- private String fieldType;
-
- @ApiModelProperty("Field comment")
- private String fieldComment;
-
- @ApiModelProperty("Required or not, 0: no need, 1: required")
- private Integer isRequired;
-
- @ApiModelProperty("Source field name")
- private String sourceFieldName;
-
- @ApiModelProperty("Source field type")
- private String sourceFieldType;
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Sink field request")
+public class SinkFieldRequest extends SinkFieldBase {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
- @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
- + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
- private String fieldFormat;
-
- @ApiModelProperty("Field order")
- private Short rankNum;
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index cfabcfc..7ad5eec 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -20,45 +20,19 @@ package org.apache.inlong.manager.common.pojo.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import lombok.EqualsAndHashCode;
/**
* Sink field response
*/
@Data
+@EqualsAndHashCode(callSuper = true)
@ApiModel("Sink field response")
-public class SinkFieldResponse {
+public class SinkFieldResponse extends SinkFieldBase {
private Integer id;
- @ApiModelProperty("Sink ID")
- private Integer sinkId;
-
- @ApiModelProperty("Field name")
- private String fieldName;
-
- @ApiModelProperty("Field type")
- private String fieldType;
-
- @ApiModelProperty("Field comment")
- private String fieldComment;
-
- @ApiModelProperty("Required or not, 0: no need, 1: required")
- private Integer isRequired;
-
- @ApiModelProperty("Source field name")
- private String sourceFieldName;
-
- @ApiModelProperty("Source field type")
- private String sourceFieldType;
-
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
- private Integer isMetaField = 0;
-
- @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
- + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
- private String fieldFormat;
-
- @ApiModelProperty("Field order")
- private Short rankNum;
+ private Integer isMetaField;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java
index 678bd77..d774af2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HivePartitionField.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
- * Sink partition field info
+ * Hive partition field info
*/
@Data
@ApiModel("Hive partition field")
@@ -34,13 +34,8 @@ public class HivePartitionField {
@ApiModelProperty("Field type")
private String fieldType;
- @ApiModelProperty("Field comment")
- private String fieldComment;
-
- @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
- + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
private String fieldFormat;
- @ApiModelProperty("Field order")
- private Short rankNum = 0;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index cdb1e2f..4af7441 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -84,7 +84,7 @@ public class BinlogSourceRequest extends SourceRequest {
* generally not used.
*/
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
- private String snapshotMode;
+ private String snapshotMode = "initial";
@ApiModelProperty("The file path to store offset info")
private String offsetFilename;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
index d220267..d991e0b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
@@ -57,8 +57,8 @@ public class InlongStreamFieldInfo {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
- @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
- + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
private String fieldFormat;
@ApiModelProperty(value = "field rank num")
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
index fb20c11..c598c31 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
@@ -38,8 +38,6 @@ public interface InlongStreamFieldEntityMapper {
List<InlongStreamFieldEntity> selectFields(@Param("groupId") String groupId,
@Param("streamId") String streamId);
- int updateByPrimaryKey(InlongStreamFieldEntity record);
-
int deleteByPrimaryKey(Integer id);
/**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index ab354cb..1a7d585 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -110,22 +110,6 @@
and field.is_predefined_field = 0
</select>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
- update inlong_stream_field
- set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- is_predefined_field = #{isPredefinedField,jdbcType=INTEGER},
- field_name = #{fieldName,jdbcType=VARCHAR},
- field_value = #{fieldValue,jdbcType=VARCHAR},
- pre_expression = #{preExpression,jdbcType=VARCHAR},
- field_type = #{fieldType,jdbcType=VARCHAR},
- is_meta_field = #(isMetaField,jdbcType=SMALLINT),
- field_format = #{fieldFormat,jdbcType=VARCHAR},
- field_comment = #{fieldComment,jdbcType=VARCHAR},
- rank_num = #{rankNum,jdbcType=SMALLINT},
- is_deleted = #{isDeleted,jdbcType=INTEGER}
- where id = #{id,jdbcType=INTEGER}
- </update>
<update id="logicDeleteAllByIdentifier">
update inlong_stream_field
set is_deleted = 1
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
index d36b59e..8b3e6d8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
-import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkDTO;
@@ -44,6 +43,7 @@ import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -52,9 +52,7 @@ import org.springframework.stereotype.Service;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.function.Supplier;
/**
@@ -83,7 +81,7 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
Preconditions.checkTrue(Constant.SINK_HIVE.equals(sinkType),
ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
HiveSinkRequest hiveRequest = (HiveSinkRequest) request;
- checkPartitionField(hiveRequest);
+ SinkInfoUtils.checkPartitionField(hiveRequest.getFieldList(), hiveRequest.getPartitionFieldList());
StreamSinkEntity entity = CommonBeanUtils.copyProperties(hiveRequest, StreamSinkEntity::new);
entity.setStatus(EntityStatus.SINK_NEW.getCode());
entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
@@ -108,27 +106,6 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
return sinkId;
}
- private void checkPartitionField(HiveSinkRequest hiveRequest) {
- if (CollectionUtils.isNotEmpty(hiveRequest.getPartitionFieldList())) {
- Preconditions.checkNotEmpty(hiveRequest.getFieldList(),
- String.format("%s:%s",
- ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY.getMessage(), hiveRequest.getSinkType()));
- Map<String, SinkFieldRequest> sinkFieldMap = new HashMap<>(hiveRequest.getFieldList().size());
- hiveRequest.getFieldList().forEach(s -> sinkFieldMap.put(s.getFieldName(), s));
- for (HivePartitionField partitionFieldInfo : hiveRequest.getPartitionFieldList()) {
- Preconditions.checkNotEmpty(partitionFieldInfo.getFieldName(), String.format("%s:%s",
- ErrorCodeEnum.SINK_PARTITION_FIELD_NAME_IS_EMPTY.getMessage(), hiveRequest.getSinkType()));
- SinkFieldRequest sinkFieldRequest = sinkFieldMap.get(partitionFieldInfo.getFieldName());
- Preconditions.checkNotNull(sinkFieldRequest, String.format("%s:%s",
- ErrorCodeEnum.SINK_PARTITION_FIELD_NOT_FOUND_IN_SINK_FIELD_LIST.getMessage(),
- hiveRequest.getSinkType()));
- Preconditions.checkNotEmpty(sinkFieldRequest.getSourceFieldName(), String.format("%s:%s",
- ErrorCodeEnum.SOURCE_FIELD_NAME_OF_SINK_PARTITION_FIELD_IS_EMPTY.getMessage(),
- hiveRequest.getSinkType()));
- }
- }
- }
-
@Override
public void saveFieldOpt(SinkRequest request) {
List<SinkFieldRequest> fieldList = request.getFieldList();
@@ -212,7 +189,7 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
HiveSinkRequest hiveRequest = (HiveSinkRequest) request;
- checkPartitionField(hiveRequest);
+ SinkInfoUtils.checkPartitionField(hiveRequest.getFieldList(), hiveRequest.getPartitionFieldList());
CommonBeanUtils.copyProperties(hiveRequest, entity, true);
try {
HiveSinkDTO dto = HiveSinkDTO.getFromRequest(hiveRequest);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index d709b9d..8c1dbbc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -119,7 +119,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
if (!SourceState.ALLOWED_UPDATE.contains(entity.getStatus())) {
- throw new RuntimeException(String.format("Source=%s is not allowed to update, "
+ throw new BusinessException(String.format("Source=%s is not allowed to update, "
+ "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
}
// Setting updated parameters of stream source entity.
@@ -138,7 +138,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
SourceState curState = SourceState.forCode(existEntity.getStatus());
SourceState nextState = SourceState.TO_BE_ISSUED_FROZEN;
if (!SourceState.isAllowedTransition(curState, nextState)) {
- throw new RuntimeException(String.format("Source=%s is not allowed to stop", existEntity));
+ throw new BusinessException(String.format("Source=%s is not allowed to stop", existEntity));
}
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
curEntity.setVersion(existEntity.getVersion() + 1);
@@ -155,7 +155,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
SourceState curState = SourceState.forCode(existEntity.getStatus());
SourceState nextState = SourceState.TO_BE_ISSUED_ACTIVE;
if (!SourceState.isAllowedTransition(curState, nextState)) {
- throw new RuntimeException(String.format("Source=%s is not allowed to restart", existEntity));
+ throw new BusinessException(String.format("Source=%s is not allowed to restart", existEntity));
}
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
curEntity.setVersion(existEntity.getVersion() + 1);
@@ -179,7 +179,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
nextState = SourceState.SOURCE_DISABLE;
}
if (!SourceState.isAllowedTransition(curState, nextState)) {
- throw new RuntimeException(String.format("Source=%s is not allowed to delete", existEntity));
+ throw new BusinessException(String.format("Source=%s is not allowed to delete", existEntity));
}
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
curEntity.setVersion(existEntity.getVersion() + 1);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java
index 02de742..98dcd9d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/DefaultHiveTableOperator.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
+
import static java.util.stream.Collectors.toList;
/**
@@ -131,7 +131,6 @@ public class DefaultHiveTableOperator implements IHiveTableOperator {
// Set partition fields
if (CollectionUtils.isNotEmpty(hiveInfo.getPartitionFieldList())) {
- hiveInfo.getPartitionFieldList().sort(Comparator.comparing(HivePartitionField::getRankNum));
for (HivePartitionField field : hiveInfo.getPartitionFieldList()) {
HiveColumnQueryBean columnBean = new HiveColumnQueryBean();
columnBean.setColumnName(field.getFieldName());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index b4d4a52..cea3110 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -135,8 +135,7 @@ public class SerializationUtils {
/**
* Get deserialization info for File
*/
- private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse,
- InlongStreamInfo streamInfo) {
+ private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse, InlongStreamInfo streamInfo) {
String serializationType = sourceResponse.getSerializationType();
DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
switch (dataType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 8b8001c..13bf46c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -23,14 +23,14 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.FileFormat;
import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
-import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldBase;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
@@ -44,28 +44,16 @@ import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class SinkInfoUtils {
- private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<>();
-
- private static final Map<String, TimeUnit> PARTITION_TIME_UNIT_MAP = new HashMap<>();
-
- static {
- PARTITION_TIME_FORMAT_MAP.put("D", "yyyyMMdd");
- PARTITION_TIME_FORMAT_MAP.put("H", "yyyyMMddHH");
- PARTITION_TIME_FORMAT_MAP.put("I", "yyyyMMddHHmm");
-
- PARTITION_TIME_UNIT_MAP.put("D", TimeUnit.DAYS);
- PARTITION_TIME_UNIT_MAP.put("H", TimeUnit.HOURS);
- PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
- }
+ private static final String DATA_FORMAT = "yyyyMMddHH";
+ private static final String TIME_FORMAT = "HHmmss";
+ private static final String DATA_TIME_FORMAT = "yyyyMMddHHmmss";
/**
* Create sink info for DataFlowInfo.
@@ -153,59 +141,29 @@ public class SinkInfoUtils {
} else {
fileFormat = new HiveSinkInfo.TextFileFormat(separator);
}
- // The primary partition field, in Sink must be HiveTimePartitionInfo
-// List<HivePartitionInfo> partitionList = new ArrayList<>();
-// String primary = hiveInfo.getPrimaryPartition();
-// if (StringUtils.isNotEmpty(primary)) {
-// // Hive partitions are by day, hour, and minute
-// String unit = hiveInfo.getPartitionUnit();
-// HiveTimePartitionInfo timePartitionInfo = new HiveTimePartitionInfo(
-// primary, PARTITION_TIME_FORMAT_MAP.get(unit));
-// partitionList.add(timePartitionInfo);
-// }
- // For the secondary partition field, the sink is temporarily encapsulated as HiveFieldPartitionInfo,
- // TODO the type be set according to the type of the field itself.
-// if (StringUtils.isNotEmpty(hiveInfo.getSecondaryPartition())) {
-// partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
-// }
// Handle hive partition list
- List<HivePartitionInfo> partitionList;
- if (CollectionUtils.isNotEmpty(hiveInfo.getPartitionFieldList())) {
- checkPartitionField(hiveInfo);
- hiveInfo.getPartitionFieldList().sort(Comparator.comparing(HivePartitionField::getRankNum));
- partitionList = hiveInfo.getPartitionFieldList().stream().map(s -> {
+ List<HivePartitionInfo> partitionList = new ArrayList<>();
+ List<HivePartitionField> partitionFieldList = hiveInfo.getPartitionFieldList();
+ if (CollectionUtils.isNotEmpty(partitionFieldList)) {
+ SinkInfoUtils.checkPartitionField(hiveInfo.getFieldList(), partitionFieldList);
+ partitionList = partitionFieldList.stream().map(s -> {
HivePartitionInfo partition;
+ String fieldFormat = s.getFieldFormat();
switch (FieldType.forName(s.getFieldType())) {
- case TIME:
- if (StringUtils.isNotBlank(s.getFieldFormat())) {
- partition = new HiveTimePartitionInfo(s.getFieldName(), s.getFieldFormat());
- } else {
- partition = new HiveTimePartitionInfo(s.getFieldName(), "HH:mm:ss");
- }
- break;
case TIMESTAMP:
- if (StringUtils.isNotBlank(s.getFieldFormat())) {
- partition = new HiveTimePartitionInfo(s.getFieldName(), s.getFieldFormat());
- } else {
- partition = new HiveTimePartitionInfo(s.getFieldName(),
- "yyyy-MM-dd HH:mm:ss");
- }
+ fieldFormat = StringUtils.isNotBlank(fieldFormat) ? fieldFormat : DATA_TIME_FORMAT;
+ partition = new HiveTimePartitionInfo(s.getFieldName(), fieldFormat);
break;
case DATE:
- if (StringUtils.isNotBlank(s.getFieldFormat())) {
- partition = new HiveTimePartitionInfo(s.getFieldName(), s.getFieldFormat());
- } else {
- partition = new HiveTimePartitionInfo(s.getFieldName(), "yyyy-MM-dd");
- }
+ fieldFormat = StringUtils.isNotBlank(fieldFormat) ? fieldFormat : DATA_FORMAT;
+ partition = new HiveTimePartitionInfo(s.getFieldName(), fieldFormat);
break;
default:
partition = new HiveFieldPartitionInfo(s.getFieldName());
}
return partition;
}).collect(Collectors.toList());
- } else {
- partitionList = new ArrayList<>();
}
// dataPath = dataPath + / + tableName
@@ -221,23 +179,37 @@ public class SinkInfoUtils {
dataPath, partitionList.toArray(new HiveSinkInfo.HivePartitionInfo[0]), fileFormat);
}
- private static void checkPartitionField(HiveSinkResponse hiveInfo) {
- if (CollectionUtils.isNotEmpty(hiveInfo.getPartitionFieldList())) {
- Preconditions.checkNotEmpty(hiveInfo.getFieldList(),
- String.format("%s:%s",
- ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY.getMessage(), hiveInfo.getSinkType()));
- Map<String, SinkFieldResponse> sinkFieldMap = new HashMap<>(hiveInfo.getFieldList().size());
- hiveInfo.getFieldList().forEach(s -> sinkFieldMap.put(s.getFieldName(), s));
- for (HivePartitionField partitionFieldInfo : hiveInfo.getPartitionFieldList()) {
- Preconditions.checkNotEmpty(partitionFieldInfo.getFieldName(), String.format("%s:%s",
- ErrorCodeEnum.SINK_PARTITION_FIELD_NAME_IS_EMPTY.getMessage(), hiveInfo.getSinkType()));
- SinkFieldResponse sinkFieldResponse = sinkFieldMap.get(partitionFieldInfo.getFieldName());
- Preconditions.checkNotNull(sinkFieldResponse, String.format("%s:%s",
- ErrorCodeEnum.SINK_PARTITION_FIELD_NOT_FOUND_IN_SINK_FIELD_LIST.getMessage(),
- hiveInfo.getSinkType()));
- Preconditions.checkNotEmpty(sinkFieldResponse.getSourceFieldName(), String.format("%s:%s",
- ErrorCodeEnum.SOURCE_FIELD_NAME_OF_SINK_PARTITION_FIELD_IS_EMPTY.getMessage(),
- hiveInfo.getSinkType()));
+ /**
+ * Check the validation of Hive partition field.
+ */
+ public static void checkPartitionField(List<? extends SinkFieldBase> fieldList,
+ List<HivePartitionField> partitionList) {
+ if (CollectionUtils.isEmpty(partitionList)) {
+ return;
+ }
+
+ if (CollectionUtils.isEmpty(fieldList)) {
+ throw new BusinessException(ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY);
+ }
+
+ Map<String, SinkFieldBase> sinkFieldMap = new HashMap<>(fieldList.size());
+ fieldList.forEach(field -> sinkFieldMap.put(field.getFieldName(), field));
+
+ for (HivePartitionField partitionField : partitionList) {
+ String fieldName = partitionField.getFieldName();
+ if (StringUtils.isBlank(fieldName)) {
+ throw new BusinessException(ErrorCodeEnum.PARTITION_FIELD_NAME_IS_EMPTY);
+ }
+
+ SinkFieldBase sinkField = sinkFieldMap.get(fieldName);
+ if (sinkField == null) {
+ throw new BusinessException(
+ String.format(ErrorCodeEnum.PARTITION_FIELD_NOT_FOUND.getMessage(), fieldName));
+ }
+
+ if (StringUtils.isBlank(sinkField.getSourceFieldName())) {
+ throw new BusinessException(
+ String.format(ErrorCodeEnum.PARTITION_FIELD_NO_SOURCE_FIELD.getMessage(), fieldName));
}
}
}
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 4b986d4..223740c 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -387,7 +387,7 @@ CREATE TABLE `inlong_stream_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
@@ -585,7 +585,7 @@ CREATE TABLE `stream_sink_field`
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index b7df0bf..07abf93 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -408,7 +408,7 @@ CREATE TABLE `inlong_stream_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
@@ -614,7 +614,7 @@ CREATE TABLE `stream_sink_field`
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)