You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/26 06:11:17 UTC
[incubator-inlong] branch master updated: [INLONG-2732][Manager] Support more parameters for Kafka source (#2739)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd4397 [INLONG-2732][Manager] Support more parameters for Kafka source (#2739)
1cd4397 is described below
commit 1cd4397b3cae3084814c0c410750bf8961249ceb
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Sat Feb 26 14:11:08 2022 +0800
[INLONG-2732][Manager] Support more parameters for Kafka source (#2739)
---
.../common/pojo/source/kafka/KafkaSourceDTO.java | 33 ++++++++++++++++------
.../pojo/source/kafka/KafkaSourceListResponse.java | 18 ++++++++----
.../pojo/source/kafka/KafkaSourceRequest.java | 23 ++++++++++++---
.../pojo/source/kafka/KafkaSourceResponse.java | 22 +++++++++++----
4 files changed, 71 insertions(+), 25 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 7173f34..f793223 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -40,23 +40,38 @@ public class KafkaSourceDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- @ApiModelProperty("Kafka bootstrap servers")
- private String address;
+ @ApiModelProperty("Kafka topic")
+ private String topic;
- @ApiModelProperty("Kafka topicName")
- private String topicName;
+ @ApiModelProperty("Kafka consumer group")
+ private String groupId;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
- private String serializationType;
+ @ApiModelProperty("Kafka servers address, such as: 127.0.0.1:9092")
+ private String bootstrapServers;
+
+ @ApiModelProperty(value = "Limit the amount of data read per second",
+ notes = "Greater than or equal to 0, equal to zero means no limit")
+ private String recordSpeedLimit;
+
+ @ApiModelProperty(value = "Limit the number of bytes read per second",
+ notes = "Greater than or equal to 0, equal to zero means no limit")
+ private String byteSpeedLimit;
+
+ @ApiModelProperty(value = "Topic partition offset",
+ notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+ private String topicPartitionOffset;
/**
* Get the dto instance from the request
*/
public static KafkaSourceDTO getFromRequest(KafkaSourceRequest request) {
return KafkaSourceDTO.builder()
- .address(request.getAddress())
- .topicName(request.getTopicName())
- .serializationType(request.getSerializationType())
+ .topic(request.getTopic())
+ .groupId(request.getGroupId())
+ .bootstrapServers(request.getBootstrapServers())
+ .recordSpeedLimit(request.getRecordSpeedLimit())
+ .byteSpeedLimit(request.getByteSpeedLimit())
+ .topicPartitionOffset(request.getTopicPartitionOffset())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 91f568b..96b8e4d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -31,13 +31,19 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@ApiModel("Response of kafka source paging list")
public class KafkaSourceListResponse extends SourceListResponse {
- @ApiModelProperty("Kafka bootstrap servers")
- private String address;
+ @ApiModelProperty("Kafka topic")
+ private String topic;
- @ApiModelProperty("Kafka topicName")
- private String topicName;
+ @ApiModelProperty("Kafka consumer group")
+ private String groupId;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
- private String serializationType;
+ @ApiModelProperty("Kafka servers address")
+ private String bootstrapServers;
+
+ @ApiModelProperty("Limit the amount of data read per second")
+ private String recordSpeedLimit;
+
+ @ApiModelProperty("Limit the number of bytes read per second")
+ private String byteSpeedLimit;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 764c5c8..6c5d59b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -36,10 +36,25 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = Constant.SOURCE_KAFKA)
public class KafkaSourceRequest extends SourceRequest {
- @ApiModelProperty("Kafka bootstrap servers")
- private String address;
+ @ApiModelProperty("Kafka topic")
+ private String topic;
- @ApiModelProperty("Kafka topicName")
- private String topicName;
+ @ApiModelProperty("Kafka consumer group")
+ private String groupId;
+
+ @ApiModelProperty("Kafka servers address, such as: 127.0.0.1:9092")
+ private String bootstrapServers;
+
+ @ApiModelProperty(value = "Limit the amount of data read per second",
+ notes = "Greater than or equal to 0, equal to zero means no limit")
+ private String recordSpeedLimit;
+
+ @ApiModelProperty(value = "Limit the number of bytes read per second",
+ notes = "Greater than or equal to 0, equal to zero means no limit")
+ private String byteSpeedLimit;
+
+ @ApiModelProperty(value = "Topic partition offset",
+ notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+ private String topicPartitionOffset;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 34e1315..770e111 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -36,12 +36,22 @@ public class KafkaSourceResponse extends SourceResponse {
private String sourceType = Constant.SOURCE_KAFKA;
- @ApiModelProperty("Kafka bootstrap servers")
- private String address;
+ @ApiModelProperty("Kafka topic")
+ private String topic;
- @ApiModelProperty("Kafka topicName")
- private String topicName;
+ @ApiModelProperty("Kafka consumer group")
+ private String groupId;
+
+ @ApiModelProperty("Kafka servers address")
+ private String bootstrapServers;
+
+ @ApiModelProperty("Limit the amount of data read per second")
+ private String recordSpeedLimit;
+
+ @ApiModelProperty("Limit the number of bytes read per second")
+ private String byteSpeedLimit;
+
+ @ApiModelProperty("Topic partition offset")
+ private String topicPartitionOffset;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
- private String serializationType;
}