You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/11 13:59:00 UTC

[incubator-inlong] branch master updated: [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source (#3069)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cb1fa8  [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source (#3069)
4cb1fa8 is described below

commit 4cb1fa85a34e596c6971b09b25fd05e1e520cd61
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 11 21:58:33 2022 +0800

    [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source (#3069)
    
    * [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source
    
    * [INLONG-3068][Manager] Add enum for Kafka autoOffsetReset param
---
 .../inlong/manager/client/api/DataFormat.java      |  8 ++++--
 .../api/{DataFormat.java => KafkaOffset.java}      | 23 +++++++++--------
 .../manager/client/api/source/KafkaSource.java     |  6 +++++
 .../api/util/InlongStreamSourceTransfer.java       | 30 +++++++++++++---------
 .../common/pojo/source/kafka/KafkaSourceDTO.java   |  8 ++++++
 .../pojo/source/kafka/KafkaSourceListResponse.java |  5 ++++
 .../pojo/source/kafka/KafkaSourceRequest.java      | 12 ++++++---
 .../pojo/source/kafka/KafkaSourceResponse.java     | 11 +++++---
 8 files changed, 71 insertions(+), 32 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
index 07ac2a9..2c6e53e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
@@ -21,7 +21,11 @@ import lombok.Getter;
 
 import java.util.Locale;
 
+/**
+ * Enum of data format.
+ */
 public enum DataFormat {
+
     CSV("csv"),
     AVRO("avro"),
     CANAL("canal"),
@@ -29,7 +33,7 @@ public enum DataFormat {
     NONE("none");
 
     @Getter
-    private String name;
+    private final String name;
 
     DataFormat(String name) {
         this.name = name;
@@ -41,6 +45,6 @@ public enum DataFormat {
                 return dataFormat;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupport dataformat=%s for Inlong", name));
+        throw new IllegalArgumentException(String.format("Unsupported DataFormat=%s for Inlong", name));
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/KafkaOffset.java
similarity index 77%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/KafkaOffset.java
index 07ac2a9..d64c50d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/KafkaOffset.java
@@ -21,26 +21,29 @@ import lombok.Getter;
 
 import java.util.Locale;
 
-public enum DataFormat {
-    CSV("csv"),
-    AVRO("avro"),
-    CANAL("canal"),
-    JSON("json"),
+/**
+ * Enum of auto offset reset strategy of Kafka.
+ */
+public enum KafkaOffset {
+
+    EARLIEST("earliest"),
+    LATEST("latest"),
     NONE("none");
 
     @Getter
-    private String name;
+    private final String name;
 
-    DataFormat(String name) {
+    KafkaOffset(String name) {
         this.name = name;
     }
 
-    public static DataFormat forName(String name) {
-        for (DataFormat dataFormat : values()) {
+    public static KafkaOffset forName(String name) {
+        for (KafkaOffset dataFormat : values()) {
             if (dataFormat.getName().equals(name.toLowerCase(Locale.ROOT))) {
                 return dataFormat;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupport dataformat=%s for Inlong", name));
+        throw new IllegalArgumentException(String.format("Unsupported KafkaOffset=%s for Inlong", name));
     }
+
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index 45b3249..1e64253 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -21,12 +21,15 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.KafkaOffset;
 import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
+@EqualsAndHashCode(callSuper = true)
 @AllArgsConstructor
 @NoArgsConstructor
 @ApiModel("Base configuration for Kafka collection")
@@ -62,4 +65,7 @@ public class KafkaSource extends StreamSource {
             notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
     private String topicPartitionOffset;
 
+    @ApiModelProperty(value = "The strategy of auto offset reset")
+    private KafkaOffset autoOffsetReset;
+
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 8276f48..9b58ddc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.StreamSource.SyncType;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.client.api.KafkaOffset;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -53,7 +54,7 @@ public class InlongStreamSourceTransfer {
             case BINLOG:
                 return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
             default:
-                throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
+                throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
         }
     }
 
@@ -66,7 +67,7 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.BINLOG && sourceResponse instanceof BinlogSourceResponse) {
             return parseMySQLBinlogSource((BinlogSourceResponse) sourceResponse);
         }
-        throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+        throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
     public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
@@ -78,7 +79,7 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
             return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
         }
-        throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+        throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
     private static KafkaSource parseKafkaSource(KafkaSourceResponse kafkaSourceResponse) {
@@ -97,17 +98,21 @@ public class InlongStreamSourceTransfer {
         return kafkaSource;
     }
 
-    private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaSourceResponse) {
+    private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaResponse) {
         KafkaSource kafkaSource = new KafkaSource();
-        kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
-        kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
-        DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+        kafkaSource.setSourceName(kafkaResponse.getSourceName());
+        kafkaSource.setConsumerGroup(kafkaResponse.getGroupId());
+
+        DataFormat dataFormat = DataFormat.forName(kafkaResponse.getSerializationType());
         kafkaSource.setDataFormat(dataFormat);
-        kafkaSource.setTopic(kafkaSourceResponse.getTopic());
-        kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
-        kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
-        kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
-        kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+        kafkaSource.setTopic(kafkaResponse.getTopic());
+        kafkaSource.setBootstrapServers(kafkaResponse.getBootstrapServers());
+        kafkaSource.setByteSpeedLimit(kafkaResponse.getByteSpeedLimit());
+        kafkaSource.setTopicPartitionOffset(kafkaResponse.getTopicPartitionOffset());
+
+        KafkaOffset offset = KafkaOffset.forName(kafkaResponse.getAutoOffsetReset());
+        kafkaSource.setAutoOffsetReset(offset);
+        kafkaSource.setRecordSpeedLimit(kafkaResponse.getRecordSpeedLimit());
         kafkaSource.setSyncType(SyncType.FULL);
         return kafkaSource;
     }
@@ -175,6 +180,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setRecordSpeedLimit(kafkaSource.getRecordSpeedLimit());
         sourceRequest.setByteSpeedLimit(kafkaSource.getByteSpeedLimit());
         sourceRequest.setTopicPartitionOffset(kafkaSource.getTopicPartitionOffset());
+        sourceRequest.setAutoOffsetReset(kafkaSource.getAutoOffsetReset().getName());
         sourceRequest.setGroupId(kafkaSource.getConsumerGroup());
         sourceRequest.setSerializationType(kafkaSource.getDataFormat().getName());
         return sourceRequest;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 2b81870..f4fb9bb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -61,6 +61,13 @@ public class KafkaSourceDTO {
             notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
     private String topicPartitionOffset;
 
+    /**
+     * @see <a href="https://docs.confluent.io/platform/current/clients/consumer.html">Kafka_consumer_config</a>
+     */
+    @ApiModelProperty(value = "The strategy of auto offset reset",
+            notes = "including earliest, latest (the default), none")
+    private String autoOffsetReset;
+
     @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
     private String serializationType;
 
@@ -75,6 +82,7 @@ public class KafkaSourceDTO {
                 .recordSpeedLimit(request.getRecordSpeedLimit())
                 .byteSpeedLimit(request.getByteSpeedLimit())
                 .topicPartitionOffset(request.getTopicPartitionOffset())
+                .autoOffsetReset(request.getAutoOffsetReset())
                 .serializationType(request.getSerializationType())
                 .build();
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 318eabb..9190bd0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -52,4 +52,9 @@ public class KafkaSourceListResponse extends SourceListResponse {
     @ApiModelProperty(value = "Topic partition offset",
             notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
     private String topicPartitionOffset;
+
+    @ApiModelProperty(value = "The strategy of auto offset reset",
+            notes = "including earliest, latest (the default), none")
+    private String autoOffsetReset;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 7485cfd..84110d1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -37,10 +37,6 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = Constant.SOURCE_KAFKA)
 public class KafkaSourceRequest extends SourceRequest {
 
-    public KafkaSourceRequest() {
-        this.setSourceType(SourceType.KAFKA.toString());
-    }
-
     @ApiModelProperty("Kafka topic")
     private String topic;
 
@@ -62,4 +58,12 @@ public class KafkaSourceRequest extends SourceRequest {
             notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
     private String topicPartitionOffset;
 
+    @ApiModelProperty(value = "The strategy of auto offset reset",
+            notes = "including earliest, latest (the default), none")
+    private String autoOffsetReset;
+
+    public KafkaSourceRequest() {
+        this.setSourceType(SourceType.KAFKA.toString());
+    }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 7879032..34b178b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -34,10 +34,6 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 @ApiModel(value = "Response of the kafka source")
 public class KafkaSourceResponse extends SourceResponse {
 
-    public KafkaSourceResponse() {
-        this.setSourceType(SourceType.KAFKA.name());
-    }
-
     @ApiModelProperty("Kafka topic")
     private String topic;
 
@@ -56,4 +52,11 @@ public class KafkaSourceResponse extends SourceResponse {
     @ApiModelProperty("Topic partition offset")
     private String topicPartitionOffset;
 
+    @ApiModelProperty(value = "The strategy of auto offset reset")
+    private String autoOffsetReset;
+
+    public KafkaSourceResponse() {
+        this.setSourceType(SourceType.KAFKA.name());
+    }
+
 }