You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/11 02:26:00 UTC
[incubator-inlong] branch master updated: [INLONG-3589][Manager] Add Iceberg sink info for Sort (#3590)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3693de893 [INLONG-3589][Manager] Add Iceberg sink info for Sort (#3590)
3693de893 is described below
commit 3693de89319bcc150ada7b59bedcfae08c5205af
Author: healchow <he...@gmail.com>
AuthorDate: Mon Apr 11 10:25:54 2022 +0800
[INLONG-3589][Manager] Add Iceberg sink info for Sort (#3590)
---
.../inlong/manager/common/enums/Constant.java | 12 --------
.../inlong/manager/common/enums/SinkType.java | 10 ++++++-
.../inlong/manager/common/enums/SourceType.java | 4 +--
.../manager/common/pojo/sink/SinkFieldBase.java | 8 +++---
.../common/pojo/sink/ck/ClickHouseSinkRequest.java | 4 +--
.../pojo/sink/ck/ClickHouseSinkResponse.java | 4 +--
.../common/pojo/sink/hive/HiveSinkRequest.java | 4 +--
.../common/pojo/sink/hive/HiveSinkResponse.java | 4 +--
.../pojo/sink/iceberg/IcebergSinkRequest.java | 4 +--
.../pojo/sink/iceberg/IcebergSinkResponse.java | 4 +--
.../common/pojo/sink/kafka/KafkaSinkRequest.java | 4 +--
.../common/pojo/sink/kafka/KafkaSinkResponse.java | 4 +--
.../common/pojo/source/kafka/KafkaSourceDTO.java | 4 ++-
.../manager/dao/entity/StreamSinkFieldEntity.java | 2 +-
.../service/sink/StreamSinkServiceImpl.java | 4 +--
...Operation.java => ClickHouseSinkOperation.java} | 32 +++++++++++-----------
...amSinkOperation.java => HiveSinkOperation.java} | 19 ++++++-------
...inkOperation.java => IcebergSinkOperation.java} | 21 +++++++-------
...mSinkOperation.java => KafkaSinkOperation.java} | 22 +++++++--------
...Operation.java => AbstractSourceOperation.java} | 4 +--
...ceOperation.java => BinlogSourceOperation.java} | 4 +--
...urceOperation.java => FileSourceOperation.java} | 4 +--
...rceOperation.java => KafkaSourceOperation.java} | 4 +--
.../hive/CreateHiveTableEventSelector.java | 6 ++--
.../thirdparty/sort/util/SinkInfoUtils.java | 29 ++++++++++++++------
.../stream/CreateStreamWorkflowDefinition.java | 4 +--
.../core/sink/ClickHouseStreamSinkServiceTest.java | 9 +++---
.../core/sink/HiveStreamSinkServiceTest.java | 11 ++++----
.../core/sink/IcebergStreamSinkServiceTest.java | 11 ++++----
.../core/sink/KafkaStreamSinkServiceTest.java | 9 +++---
30 files changed, 138 insertions(+), 127 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 44c185d71..778a796ff 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -28,14 +28,6 @@ public class Constant {
public static final String URL_SPLITTER = ",";
public static final String HOST_SPLITTER = ":";
- public static final String SINK_HIVE = "HIVE";
-
- public static final String SINK_KAFKA = "KAFKA";
-
- public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
-
- public static final String SINK_ICEBERG = "ICEBERG";
-
public static final String DATA_SOURCE_DB = "DB";
public static final String DATA_SOURCE_FILE = "FILE";
@@ -69,10 +61,6 @@ public class Constant {
public static final String REQUEST_IS_EMPTY = "request is empty";
- public static final String SINK_TYPE_IS_EMPTY = "sinkType is empty";
-
- public static final String SINK_TYPE_NOT_SAME = "Expected sinkType is %s, but found %s";
-
public static final String PULSAR_TOPIC_TYPE_SERIAL = "SERIAL";
public static final String PULSAR_TOPIC_TYPE_PARALLEL = "PARALLEL";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index 19a547bfa..3eea7f22a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -21,7 +21,15 @@ import java.util.Locale;
public enum SinkType {
- HIVE, ES, CLICKHOUSE, ICEBERG, KAFKA;
+ HIVE, KAFKA, ICEBERG, CLICKHOUSE;
+
+ public static final String SINK_HIVE = "HIVE";
+ public static final String SINK_KAFKA = "KAFKA";
+ public static final String SINK_ICEBERG = "ICEBERG";
+ public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
+
+ public static final String SINK_TYPE_IS_EMPTY = "Sink type is empty";
+ public static final String SINK_TYPE_NOT_SAME = "Expected sink type is %s, but found %s";
/**
* Get the SinkType enum via the given sinkType string
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 33ecdca8f..42bfd9b91 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -37,8 +37,8 @@ public enum SourceType {
public static final String SOURCE_SQL = "SQL";
public static final String SOURCE_BINLOG = "BINLOG";
public static final String SOURCE_KAFKA = "KAFKA";
- public static final String SOURCE_TYPE_IS_EMPTY = "sourceType is empty";
- public static final String SOURCE_TYPE_NOT_SAME = "Expected sourceType is %s, but found %s";
+ public static final String SOURCE_TYPE_IS_EMPTY = "Source type is empty";
+ public static final String SOURCE_TYPE_NOT_SAME = "Expected source type is %s, but found %s";
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
index ff355e32c..42a0efecf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
@@ -43,16 +43,16 @@ public class SinkFieldBase {
@ApiModelProperty("Source field type")
private String sourceFieldType;
- @ApiModelProperty("Field length")
+ @ApiModelProperty("Length of fixed type")
private Integer fieldLength;
- @ApiModelProperty("Field precision")
+ @ApiModelProperty("Precision of decimal type, that is, field length. precision >= scale")
private Integer fieldPrecision;
- @ApiModelProperty("Field scale")
+ @ApiModelProperty("Range of decimal type, that is, the number of decimal places. precision >= scale")
private Integer fieldScale;
- @ApiModelProperty("Partition strategy, including: Identity, Year, Month, Day, Hour, Bucket")
+ @ApiModelProperty("Partition strategy, including: None, Identity, Year, Month, Day, Hour, Bucket, Truncate")
private String partitionStrategy;
@ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
index 2f28191d2..6493ae5c3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the ClickHouse sink info")
-@JsonTypeDefine(value = Constant.SINK_CLICKHOUSE)
+@JsonTypeDefine(value = SinkType.SINK_CLICKHOUSE)
public class ClickHouseSinkRequest extends SinkRequest {
@ApiModelProperty("ClickHouse JDBC URL")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
index 0e8ea89f1..bb0e47654 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
/**
@@ -60,7 +60,7 @@ public class ClickHouseSinkResponse extends SinkResponse {
private Integer writeMaxRetryTimes;
public ClickHouseSinkResponse() {
- this.sinkType = Constant.SINK_CLICKHOUSE;
+ this.sinkType = SinkType.SINK_CLICKHOUSE;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
index 5fa303f1e..80d8dc52b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -35,7 +35,7 @@ import java.util.List;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the Hive sink info")
-@JsonTypeDefine(value = Constant.SINK_HIVE)
+@JsonTypeDefine(value = SinkType.SINK_HIVE)
public class HiveSinkRequest extends SinkRequest {
@ApiModelProperty("Whether to enable create table, 1: enable, 0: disable, default is 1")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
index 1b597f91d..d055c3bf4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import java.util.List;
@@ -73,7 +73,7 @@ public class HiveSinkResponse extends SinkResponse {
private String dataSeparator;
public HiveSinkResponse() {
- this.sinkType = Constant.SINK_HIVE;
+ this.sinkType = SinkType.SINK_HIVE;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index 1fb0a24f4..6938daed2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the Iceberg sink info")
-@JsonTypeDefine(value = Constant.SINK_ICEBERG)
+@JsonTypeDefine(value = SinkType.SINK_ICEBERG)
public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("Hive JDBC URL")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java
index f40ceb8bc..8b2247b26 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
/**
@@ -65,6 +65,6 @@ public class IcebergSinkResponse extends SinkResponse {
private String dataConsistency;
public IcebergSinkResponse() {
- this.sinkType = Constant.SINK_ICEBERG;
+ this.sinkType = SinkType.SINK_ICEBERG;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
index 791f8dc08..49fb9f81f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the Kafka sink info")
-@JsonTypeDefine(value = Constant.SINK_KAFKA)
+@JsonTypeDefine(value = SinkType.SINK_KAFKA)
public class KafkaSinkRequest extends SinkRequest {
@ApiModelProperty("Kafka bootstrap servers")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
index f7929445f..e6f62f4f6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
/**
@@ -51,7 +51,7 @@ public class KafkaSinkResponse extends SinkResponse {
private String autoOffsetReset;
public KafkaSinkResponse() {
- this.sinkType = Constant.SINK_KAFKA;
+ this.sinkType = SinkType.SINK_KAFKA;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 2c5c28f48..4eb227f5c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -32,10 +32,10 @@ import javax.validation.constraints.NotNull;
/**
* kafka source information data transfer object.
*/
+@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
-@Data
public class KafkaSourceDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -62,6 +62,8 @@ public class KafkaSourceDTO {
private String topicPartitionOffset;
/**
+ * The strategy of auto offset reset.
+ *
* @see <a href="https://docs.confluent.io/platform/current/clients/consumer.html">Kafka_consumer_config</a>
*/
@ApiModelProperty(value = "The strategy of auto offset reset",
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 00dee4daf..89ccebce2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -41,7 +41,7 @@ public class StreamSinkFieldEntity implements Serializable {
private Integer fieldLength;
private Integer fieldPrecision;
private Integer fieldScale;
- private String partitionStrategy; // For Iceberg, including: Identity, Year, Month, Day, Hour, Bucket
+ private String partitionStrategy;
private Integer isMetaField;
private String fieldFormat;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index f030959bd..f49f12508 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -333,7 +333,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
for (SinkApproveDTO dto : approveList) {
// According to the sink type, save sink information
String sinkType = dto.getSinkType();
- Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
+ Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
StreamSinkEntity entity = new StreamSinkEntity();
entity.setId(dto.getId());
@@ -357,7 +357,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
String streamId = request.getInlongStreamId();
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
String sinkType = request.getSinkType();
- Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
+ Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
index 2be4ba955..6aff31f3c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
@@ -20,16 +20,10 @@ package org.apache.inlong.manager.service.sink.ck;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.function.Supplier;
-import javax.validation.constraints.NotNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
@@ -53,13 +47,19 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
/**
* ClickHouse sink operation
*/
@Service
-public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
+public class ClickHouseSinkOperation implements StreamSinkOperation {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseStreamSinkOperation.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkOperation.class);
@Autowired
private ObjectMapper objectMapper;
@@ -76,7 +76,7 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
@Override
public Integer saveOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(sinkType),
+ Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(sinkType),
ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
ClickHouseSinkRequest sinkRequest = (ClickHouseSinkRequest) request;
@@ -141,8 +141,8 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_CLICKHOUSE, existType));
+ Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
SinkResponse response = this.getFromEntity(entity, ClickHouseSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -161,8 +161,8 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
}
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_CLICKHOUSE, existType));
+ Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
ClickHouseSinkDTO dto = ClickHouseSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -182,8 +182,8 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
@Override
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(sinkType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_CLICKHOUSE, sinkType));
+ Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(sinkType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
index 92eea579c..dc1b3aa09 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -59,9 +58,9 @@ import java.util.function.Supplier;
* Hive sink operation
*/
@Service
-public class HiveStreamSinkOperation implements StreamSinkOperation {
+public class HiveSinkOperation implements StreamSinkOperation {
- private static final Logger LOGGER = LoggerFactory.getLogger(HiveStreamSinkOperation.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HiveSinkOperation.class);
@Autowired
private ObjectMapper objectMapper;
@@ -78,7 +77,7 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
@Override
public Integer saveOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_HIVE.equals(sinkType),
+ Preconditions.checkTrue(SinkType.SINK_HIVE.equals(sinkType),
ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
HiveSinkRequest hiveRequest = (HiveSinkRequest) request;
SinkInfoUtils.checkPartitionField(hiveRequest.getFieldList(), hiveRequest.getPartitionFieldList());
@@ -142,8 +141,8 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_HIVE.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_HIVE, existType));
+ Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
SinkResponse response = this.getFromEntity(entity, HiveSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -162,8 +161,8 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
}
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_HIVE.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_HIVE, existType));
+ Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -183,8 +182,8 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
@Override
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_HIVE.equals(sinkType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_HIVE, sinkType));
+ Preconditions.checkTrue(SinkType.SINK_HIVE.equals(sinkType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
index 284cbf7ad..d09fa3ea3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
@@ -22,9 +22,8 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
@@ -54,9 +53,9 @@ import java.util.List;
import java.util.function.Supplier;
@Service
-public class IcebergStreamSinkOperation implements StreamSinkOperation {
+public class IcebergSinkOperation implements StreamSinkOperation {
- private static final Logger LOGGER = LoggerFactory.getLogger(IcebergStreamSinkOperation.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSinkOperation.class);
@Autowired
private ObjectMapper objectMapper;
@@ -73,7 +72,7 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
@Override
public Integer saveOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(sinkType),
+ Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(sinkType),
ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
IcebergSinkRequest icebergSinkRequest = (IcebergSinkRequest) request;
@@ -136,8 +135,8 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_ICEBERG, existType));
+ Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
SinkResponse response = this.getFromEntity(entity, IcebergSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -156,8 +155,8 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
}
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_ICEBERG, existType));
+ Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -177,8 +176,8 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
@Override
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(sinkType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_ICEBERG, sinkType));
+ Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(sinkType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
similarity index 92%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
index c1cdf4926..e8a4a169c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -58,9 +57,9 @@ import java.util.function.Supplier;
* Kafka sink operation
*/
@Service
-public class KafkaStreamSinkOperation implements StreamSinkOperation {
+public class KafkaSinkOperation implements StreamSinkOperation {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSinkOperation.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkOperation.class);
@Autowired
private ObjectMapper objectMapper;
@@ -77,7 +76,7 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
@Override
public Integer saveOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_KAFKA.equals(sinkType),
+ Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(sinkType),
ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
KafkaSinkRequest kafkaSinkRequest = (KafkaSinkRequest) request;
@@ -140,12 +139,11 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_KAFKA.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_KAFKA, existType));
+ Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
SinkResponse response = this.getFromEntity(entity, KafkaSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
- List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities,
- SinkFieldResponse::new);
+ List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
response.setFieldList(infos);
return response;
}
@@ -157,8 +155,8 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
return result;
}
String existType = entity.getSinkType();
- Preconditions.checkTrue(Constant.SINK_KAFKA.equals(existType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_KAFKA, existType));
+ Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
KafkaSinkDTO dto = KafkaSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -178,8 +176,8 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
@Override
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
- Preconditions.checkTrue(Constant.SINK_KAFKA.equals(sinkType),
- String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_KAFKA, sinkType));
+ Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(sinkType),
+ String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index 7ab709784..e99c00421 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -44,9 +44,9 @@ import java.util.List;
/**
* Default operation of stream source.
*/
-public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {
+public abstract class AbstractSourceOperation implements StreamSourceOperation {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperation.class);
@Autowired
protected StreamSourceEntityMapper sourceMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
index 2c0d296c6..3f9c35923 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
-import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
* Binlog source operation
*/
@Service
-public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
+public class BinlogSourceOperation extends AbstractSourceOperation {
@Autowired
private ObjectMapper objectMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
index 29e1e4938..da5bc72b7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
@@ -29,14 +29,14 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
-import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.function.Supplier;
@Service
-public class FileStreamSourceOperation extends AbstractStreamSourceOperation {
+public class FileSourceOperation extends AbstractSourceOperation {
@Autowired
private ObjectMapper objectMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
index f54a3a782..3fd771d74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
-import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
* kafka stream source operation.
*/
@Service
-public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
+public class KafkaSourceOperation extends AbstractSourceOperation {
@Autowired
private ObjectMapper objectMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
index 5555bcfc8..214d5f0f2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.service.thirdparty.hive;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.workflow.event.EventSelector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -54,7 +54,7 @@ public class CreateHiveTableEventSelector implements EventSelector {
return false;
}
String groupId = form.getInlongGroupId();
- List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, Constant.SINK_HIVE,
+ List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
streamMapper.selectByGroupId(groupId)
.stream()
.map(InlongStreamEntity::getInlongStreamId)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index ad9a734d5..d1490c020 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -40,6 +41,7 @@ import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFieldPartitionInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.IcebergSinkInfo;
import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
@@ -66,10 +68,12 @@ public class SinkInfoUtils {
sinkInfo = createHiveSinkInfo((HiveSinkResponse) sinkResponse, sinkFields);
} else if (SinkType.forType(sinkType) == SinkType.KAFKA) {
sinkInfo = createKafkaSinkInfo(sourceResponse, (KafkaSinkResponse) sinkResponse, sinkFields);
+ } else if (SinkType.SINK_ICEBERG.equals(sinkType)) {
+ sinkInfo = createIcebergSinkInfo((IcebergSinkResponse) sinkResponse, sinkFields);
} else if (SinkType.forType(sinkType) == SinkType.CLICKHOUSE) {
sinkInfo = createClickhouseSinkInfo((ClickHouseSinkResponse) sinkResponse, sinkFields);
} else {
- throw new RuntimeException(String.format("Unsupported SinkType {%s}", sinkType));
+ throw new BusinessException(String.format("Unsupported SinkType {%s}", sinkType));
}
return sinkInfo;
}
@@ -77,16 +81,16 @@ public class SinkInfoUtils {
private static ClickHouseSinkInfo createClickhouseSinkInfo(ClickHouseSinkResponse sinkResponse,
List<FieldInfo> sinkFields) {
if (StringUtils.isEmpty(sinkResponse.getJdbcUrl())) {
- throw new RuntimeException(String.format("ClickHouse={%s} server url cannot be empty", sinkResponse));
+ throw new BusinessException(String.format("ClickHouse={%s} server url cannot be empty", sinkResponse));
} else if (CollectionUtils.isEmpty(sinkResponse.getFieldList())) {
- throw new RuntimeException(String.format("ClickHouse={%s} fields cannot be empty", sinkResponse));
+ throw new BusinessException(String.format("ClickHouse={%s} fields cannot be empty", sinkResponse));
} else if (StringUtils.isEmpty(sinkResponse.getTableName())) {
- throw new RuntimeException(String.format("ClickHouse={%s} table name cannot be empty", sinkResponse));
+ throw new BusinessException(String.format("ClickHouse={%s} table name cannot be empty", sinkResponse));
} else if (StringUtils.isEmpty(sinkResponse.getDatabaseName())) {
- throw new RuntimeException(String.format("ClickHouse={%s} database name cannot be empty", sinkResponse));
+ throw new BusinessException(String.format("ClickHouse={%s} database name cannot be empty", sinkResponse));
}
if (sinkResponse.getDistributedTable() == null) {
- throw new RuntimeException(String.format("ClickHouse={%s} distribute cannot be empty", sinkResponse));
+ throw new BusinessException(String.format("ClickHouse={%s} distribute cannot be empty", sinkResponse));
}
ClickHouseSinkInfo.PartitionStrategy partitionStrategy;
@@ -108,6 +112,15 @@ public class SinkInfoUtils {
sinkResponse.getWriteMaxRetryTimes());
}
+ // TODO Need set more configs for IcebergSinkInfo
+ private static IcebergSinkInfo createIcebergSinkInfo(IcebergSinkResponse sinkResponse, List<FieldInfo> sinkFields) {
+ if (StringUtils.isEmpty(sinkResponse.getDataPath())) {
+ throw new BusinessException(String.format("Iceberg={%s} data path cannot be empty", sinkResponse));
+ }
+
+ return new IcebergSinkInfo(sinkFields.toArray(new FieldInfo[0]), sinkResponse.getDataPath());
+ }
+
private static KafkaSinkInfo createKafkaSinkInfo(SourceResponse sourceResponse, KafkaSinkResponse sinkResponse,
List<FieldInfo> sinkFields) {
String addressUrl = sinkResponse.getBootstrapServers();
@@ -122,10 +135,10 @@ public class SinkInfoUtils {
*/
private static HiveSinkInfo createHiveSinkInfo(HiveSinkResponse hiveInfo, List<FieldInfo> sinkFields) {
if (hiveInfo.getJdbcUrl() == null) {
- throw new RuntimeException(String.format("HiveSink={%s} server url cannot be empty", hiveInfo));
+ throw new BusinessException(String.format("HiveSink={%s} server url cannot be empty", hiveInfo));
}
if (CollectionUtils.isEmpty(hiveInfo.getFieldList())) {
- throw new RuntimeException(String.format("HiveSink={%s} fields cannot be empty", hiveInfo));
+ throw new BusinessException(String.format("HiveSink={%s} fields cannot be empty", hiveInfo));
}
// Use the field separator in Hive, the default is TextFile
Character separator = (char) Integer.parseInt(hiveInfo.getDataSeparator());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index e5c106d8c..373f0eea7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableForStreamListener;
@@ -124,7 +124,7 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();
- List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, Constant.SINK_HIVE,
+ List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
Collections.singletonList(streamId));
if (CollectionUtils.isEmpty(dsForHive)) {
log.warn("inlong group [{}] adn inlong stream [{}] does not have sink, skip create hive table", groupId,
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
index 9d9e75335..0484cd74f 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.core.sink;
import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
@@ -58,7 +59,7 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkName(sinkName);
- sinkInfo.setSinkType(Constant.SINK_CLICKHOUSE);
+ sinkInfo.setSinkType(SinkType.SINK_CLICKHOUSE);
sinkInfo.setJdbcUrl(ckJdbcUrl);
sinkInfo.setUsername(ckUsername);
sinkInfo.setDatabaseName(ckDatabaseName);
@@ -69,19 +70,19 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
@After
public void deleteKafkaSink() {
- boolean result = sinkService.delete(sinkId, Constant.SINK_CLICKHOUSE, globalOperator);
+ boolean result = sinkService.delete(sinkId, SinkType.SINK_CLICKHOUSE, globalOperator);
Assert.assertTrue(result);
}
@Test
public void testListByIdentifier() {
- SinkResponse sink = sinkService.get(sinkId, Constant.SINK_CLICKHOUSE);
+ SinkResponse sink = sinkService.get(sinkId, SinkType.SINK_CLICKHOUSE);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
}
@Test
public void testGetAndUpdate() {
- SinkResponse response = sinkService.get(sinkId, Constant.SINK_CLICKHOUSE);
+ SinkResponse response = sinkService.get(sinkId, SinkType.SINK_CLICKHOUSE);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
ClickHouseSinkResponse kafkaSinkResponse = (ClickHouseSinkResponse) response;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
index bb51ef5b2..3f49f3dc4 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.core.sink;
import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
@@ -50,7 +51,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
HiveSinkRequest sinkInfo = new HiveSinkRequest();
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
- sinkInfo.setSinkType(Constant.SINK_HIVE);
+ sinkInfo.setSinkType(SinkType.SINK_HIVE);
sinkInfo.setEnableCreateResource(Constant.DISABLE_CREATE_RESOURCE);
sinkInfo.setSinkName(sinkName);
return sinkService.save(sinkInfo, globalOperator);
@@ -61,7 +62,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
Integer id = this.saveSink();
Assert.assertNotNull(id);
- boolean result = sinkService.delete(id, Constant.SINK_HIVE, globalOperator);
+ boolean result = sinkService.delete(id, SinkType.SINK_HIVE, globalOperator);
Assert.assertTrue(result);
}
@@ -69,16 +70,16 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
public void testListByIdentifier() {
Integer id = this.saveSink();
- SinkResponse sink = sinkService.get(id, Constant.SINK_HIVE);
+ SinkResponse sink = sinkService.get(id, SinkType.SINK_HIVE);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
- sinkService.delete(id, Constant.SINK_HIVE, globalOperator);
+ sinkService.delete(id, SinkType.SINK_HIVE, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSink();
- SinkResponse response = sinkService.get(id, Constant.SINK_HIVE);
+ SinkResponse response = sinkService.get(id, SinkType.SINK_HIVE);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
HiveSinkResponse hiveResponse = (HiveSinkResponse) response;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
index b3ad88ca9..2cc955477 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.core.sink;
import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
@@ -46,7 +47,7 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
IcebergSinkRequest sinkInfo = new IcebergSinkRequest();
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
- sinkInfo.setSinkType(Constant.SINK_ICEBERG);
+ sinkInfo.setSinkType(SinkType.SINK_ICEBERG);
sinkInfo.setEnableCreateResource(Constant.DISABLE_CREATE_RESOURCE);
sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
sinkInfo.setSinkName(sinkName);
@@ -57,22 +58,22 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
public void testSaveAndDelete() {
Integer id = this.saveSink();
Assert.assertNotNull(id);
- boolean result = sinkService.delete(id, Constant.SINK_ICEBERG, globalOperator);
+ boolean result = sinkService.delete(id, SinkType.SINK_ICEBERG, globalOperator);
Assert.assertTrue(result);
}
@Test
public void testListByIdentifier() {
Integer id = this.saveSink();
- SinkResponse sink = sinkService.get(id, Constant.SINK_ICEBERG);
+ SinkResponse sink = sinkService.get(id, SinkType.SINK_ICEBERG);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
- sinkService.delete(id, Constant.SINK_ICEBERG, globalOperator);
+ sinkService.delete(id, SinkType.SINK_ICEBERG, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSink();
- SinkResponse response = sinkService.get(id, Constant.SINK_ICEBERG);
+ SinkResponse response = sinkService.get(id, SinkType.SINK_ICEBERG);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
IcebergSinkResponse icebergSinkResponse = (IcebergSinkResponse) response;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
index 9d441349f..6f48f7812 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.core.sink;
import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
@@ -56,7 +57,7 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
KafkaSinkRequest sinkInfo = new KafkaSinkRequest();
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
- sinkInfo.setSinkType(Constant.SINK_KAFKA);
+ sinkInfo.setSinkType(SinkType.SINK_KAFKA);
sinkInfo.setSinkName(sinkName);
sinkInfo.setSerializationType(serializationType);
sinkInfo.setBootstrapServers(bootstrapServers);
@@ -67,19 +68,19 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
@After
public void deleteKafkaSink() {
- boolean result = sinkService.delete(kafkaSinkId, Constant.SINK_KAFKA, globalOperator);
+ boolean result = sinkService.delete(kafkaSinkId, SinkType.SINK_KAFKA, globalOperator);
Assert.assertTrue(result);
}
@Test
public void testListByIdentifier() {
- SinkResponse sink = sinkService.get(kafkaSinkId, Constant.SINK_KAFKA);
+ SinkResponse sink = sinkService.get(kafkaSinkId, SinkType.SINK_KAFKA);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
}
@Test
public void testGetAndUpdate() {
- SinkResponse response = sinkService.get(kafkaSinkId, Constant.SINK_KAFKA);
+ SinkResponse response = sinkService.get(kafkaSinkId, SinkType.SINK_KAFKA);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
KafkaSinkResponse kafkaSinkResponse = (KafkaSinkResponse) response;