You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/26 06:11:17 UTC

[incubator-inlong] branch master updated: [INLONG-2732][Manager] Support more parameters for Kafka source (#2739)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd4397  [INLONG-2732][Manager] Support more parameters for Kafka source (#2739)
1cd4397 is described below

commit 1cd4397b3cae3084814c0c410750bf8961249ceb
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Sat Feb 26 14:11:08 2022 +0800

    [INLONG-2732][Manager] Support more parameters for Kafka source (#2739)
---
 .../common/pojo/source/kafka/KafkaSourceDTO.java   | 33 ++++++++++++++++------
 .../pojo/source/kafka/KafkaSourceListResponse.java | 18 ++++++++----
 .../pojo/source/kafka/KafkaSourceRequest.java      | 23 ++++++++++++---
 .../pojo/source/kafka/KafkaSourceResponse.java     | 22 +++++++++++----
 4 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 7173f34..f793223 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -40,23 +40,38 @@ public class KafkaSourceDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    @ApiModelProperty("Kafka bootstrap servers")
-    private String address;
+    @ApiModelProperty("Kafka topic")
+    private String topic;
 
-    @ApiModelProperty("Kafka topicName")
-    private String topicName;
+    @ApiModelProperty("Kafka consumer group")
+    private String groupId;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
-    private String serializationType;
+    @ApiModelProperty("Kafka servers address, such as: 127.0.0.1:9092")
+    private String bootstrapServers;
+
+    @ApiModelProperty(value = "Limit the amount of data read per second",
+            notes = "Greater than or equal to 0, equal to zero means no limit")
+    private String recordSpeedLimit;
+
+    @ApiModelProperty(value = "Limit the number of bytes read per second",
+            notes = "Greater than or equal to 0, equal to zero means no limit")
+    private String byteSpeedLimit;
+
+    @ApiModelProperty(value = "Topic partition offset",
+            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+    private String topicPartitionOffset;
 
     /**
      * Get the dto instance from the request
      */
     public static KafkaSourceDTO getFromRequest(KafkaSourceRequest request) {
         return KafkaSourceDTO.builder()
-                .address(request.getAddress())
-                .topicName(request.getTopicName())
-                .serializationType(request.getSerializationType())
+                .topic(request.getTopic())
+                .groupId(request.getGroupId())
+                .bootstrapServers(request.getBootstrapServers())
+                .recordSpeedLimit(request.getRecordSpeedLimit())
+                .byteSpeedLimit(request.getByteSpeedLimit())
+                .topicPartitionOffset(request.getTopicPartitionOffset())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 91f568b..96b8e4d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -31,13 +31,19 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 @ApiModel("Response of kafka source paging list")
 public class KafkaSourceListResponse extends SourceListResponse {
 
-    @ApiModelProperty("Kafka bootstrap servers")
-    private String address;
+    @ApiModelProperty("Kafka topic")
+    private String topic;
 
-    @ApiModelProperty("Kafka topicName")
-    private String topicName;
+    @ApiModelProperty("Kafka consumer group")
+    private String groupId;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
-    private String serializationType;
+    @ApiModelProperty("Kafka servers address")
+    private String bootstrapServers;
+
+    @ApiModelProperty("Limit the amount of data read per second")
+    private String recordSpeedLimit;
+
+    @ApiModelProperty("Limit the number of bytes read per second")
+    private String byteSpeedLimit;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 764c5c8..6c5d59b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -36,10 +36,25 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = Constant.SOURCE_KAFKA)
 public class KafkaSourceRequest extends SourceRequest {
 
-    @ApiModelProperty("Kafka bootstrap servers")
-    private String address;
+    @ApiModelProperty("Kafka topic")
+    private String topic;
 
-    @ApiModelProperty("Kafka topicName")
-    private String topicName;
+    @ApiModelProperty("Kafka consumer group")
+    private String groupId;
+
+    @ApiModelProperty("Kafka servers address, such as: 127.0.0.1:9092")
+    private String bootstrapServers;
+
+    @ApiModelProperty(value = "Limit the amount of data read per second",
+            notes = "Greater than or equal to 0, equal to zero means no limit")
+    private String recordSpeedLimit;
+
+    @ApiModelProperty(value = "Limit the number of bytes read per second",
+            notes = "Greater than or equal to 0, equal to zero means no limit")
+    private String byteSpeedLimit;
+
+    @ApiModelProperty(value = "Topic partition offset",
+            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+    private String topicPartitionOffset;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 34e1315..770e111 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -36,12 +36,22 @@ public class KafkaSourceResponse extends SourceResponse {
 
     private String sourceType = Constant.SOURCE_KAFKA;
 
-    @ApiModelProperty("Kafka bootstrap servers")
-    private String address;
+    @ApiModelProperty("Kafka topic")
+    private String topic;
 
-    @ApiModelProperty("Kafka topicName")
-    private String topicName;
+    @ApiModelProperty("Kafka consumer group")
+    private String groupId;
+
+    @ApiModelProperty("Kafka servers address")
+    private String bootstrapServers;
+
+    @ApiModelProperty("Limit the amount of data read per second")
+    private String recordSpeedLimit;
+
+    @ApiModelProperty("Limit the number of bytes read per second")
+    private String byteSpeedLimit;
+
+    @ApiModelProperty("Topic partition offset")
+    private String topicPartitionOffset;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
-    private String serializationType;
 }