You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/11 13:59:00 UTC
[incubator-inlong] branch master updated: [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source (#3069)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4cb1fa8 [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source (#3069)
4cb1fa8 is described below
commit 4cb1fa85a34e596c6971b09b25fd05e1e520cd61
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 11 21:58:33 2022 +0800
[INLONG-3068][Manager] Add autoOffsetReset param for Kafka source (#3069)
* [INLONG-3068][Manager] Add autoOffsetReset param for Kafka source
* [INLONG-3068][Manager] Add enum for Kafka autoOffsetReset param
---
.../inlong/manager/client/api/DataFormat.java | 8 ++++--
.../api/{DataFormat.java => KafkaOffset.java} | 23 +++++++++--------
.../manager/client/api/source/KafkaSource.java | 6 +++++
.../api/util/InlongStreamSourceTransfer.java | 30 +++++++++++++---------
.../common/pojo/source/kafka/KafkaSourceDTO.java | 8 ++++++
.../pojo/source/kafka/KafkaSourceListResponse.java | 5 ++++
.../pojo/source/kafka/KafkaSourceRequest.java | 12 ++++++---
.../pojo/source/kafka/KafkaSourceResponse.java | 11 +++++---
8 files changed, 71 insertions(+), 32 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
index 07ac2a9..2c6e53e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
@@ -21,7 +21,11 @@ import lombok.Getter;
import java.util.Locale;
+/**
+ * Enum of data format.
+ */
public enum DataFormat {
+
CSV("csv"),
AVRO("avro"),
CANAL("canal"),
@@ -29,7 +33,7 @@ public enum DataFormat {
NONE("none");
@Getter
- private String name;
+ private final String name;
DataFormat(String name) {
this.name = name;
@@ -41,6 +45,6 @@ public enum DataFormat {
return dataFormat;
}
}
- throw new IllegalArgumentException(String.format("Unsupport dataformat=%s for Inlong", name));
+ throw new IllegalArgumentException(String.format("Unsupported DataFormat=%s for Inlong", name));
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/KafkaOffset.java
similarity index 77%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/KafkaOffset.java
index 07ac2a9..d64c50d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/KafkaOffset.java
@@ -21,26 +21,29 @@ import lombok.Getter;
import java.util.Locale;
-public enum DataFormat {
- CSV("csv"),
- AVRO("avro"),
- CANAL("canal"),
- JSON("json"),
+/**
+ * Enum of auto offset reset strategy of Kafka.
+ */
+public enum KafkaOffset {
+
+ EARLIEST("earliest"),
+ LATEST("latest"),
NONE("none");
@Getter
- private String name;
+ private final String name;
- DataFormat(String name) {
+ KafkaOffset(String name) {
this.name = name;
}
- public static DataFormat forName(String name) {
- for (DataFormat dataFormat : values()) {
+ public static KafkaOffset forName(String name) {
+ for (KafkaOffset dataFormat : values()) {
if (dataFormat.getName().equals(name.toLowerCase(Locale.ROOT))) {
return dataFormat;
}
}
- throw new IllegalArgumentException(String.format("Unsupport dataformat=%s for Inlong", name));
+ throw new IllegalArgumentException(String.format("Unsupported KafkaOffset=%s for Inlong", name));
}
+
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index 45b3249..1e64253 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -21,12 +21,15 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.KafkaOffset;
import org.apache.inlong.manager.common.enums.SourceType;
@Data
+@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base configuration for Kafka collection")
@@ -62,4 +65,7 @@ public class KafkaSource extends StreamSource {
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
private String topicPartitionOffset;
+ @ApiModelProperty(value = "The strategy of auto offset reset")
+ private KafkaOffset autoOffsetReset;
+
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 8276f48..9b58ddc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.StreamSource.SyncType;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.client.api.KafkaOffset;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -53,7 +54,7 @@ public class InlongStreamSourceTransfer {
case BINLOG:
return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
default:
- throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
+ throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
}
}
@@ -66,7 +67,7 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.BINLOG && sourceResponse instanceof BinlogSourceResponse) {
return parseMySQLBinlogSource((BinlogSourceResponse) sourceResponse);
}
- throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+ throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
@@ -78,7 +79,7 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
}
- throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+ throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
private static KafkaSource parseKafkaSource(KafkaSourceResponse kafkaSourceResponse) {
@@ -97,17 +98,21 @@ public class InlongStreamSourceTransfer {
return kafkaSource;
}
- private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaSourceResponse) {
+ private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaResponse) {
KafkaSource kafkaSource = new KafkaSource();
- kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
- kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
- DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+ kafkaSource.setSourceName(kafkaResponse.getSourceName());
+ kafkaSource.setConsumerGroup(kafkaResponse.getGroupId());
+
+ DataFormat dataFormat = DataFormat.forName(kafkaResponse.getSerializationType());
kafkaSource.setDataFormat(dataFormat);
- kafkaSource.setTopic(kafkaSourceResponse.getTopic());
- kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
- kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
- kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
- kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+ kafkaSource.setTopic(kafkaResponse.getTopic());
+ kafkaSource.setBootstrapServers(kafkaResponse.getBootstrapServers());
+ kafkaSource.setByteSpeedLimit(kafkaResponse.getByteSpeedLimit());
+ kafkaSource.setTopicPartitionOffset(kafkaResponse.getTopicPartitionOffset());
+
+ KafkaOffset offset = KafkaOffset.forName(kafkaResponse.getAutoOffsetReset());
+ kafkaSource.setAutoOffsetReset(offset);
+ kafkaSource.setRecordSpeedLimit(kafkaResponse.getRecordSpeedLimit());
kafkaSource.setSyncType(SyncType.FULL);
return kafkaSource;
}
@@ -175,6 +180,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setRecordSpeedLimit(kafkaSource.getRecordSpeedLimit());
sourceRequest.setByteSpeedLimit(kafkaSource.getByteSpeedLimit());
sourceRequest.setTopicPartitionOffset(kafkaSource.getTopicPartitionOffset());
+ sourceRequest.setAutoOffsetReset(kafkaSource.getAutoOffsetReset().getName());
sourceRequest.setGroupId(kafkaSource.getConsumerGroup());
sourceRequest.setSerializationType(kafkaSource.getDataFormat().getName());
return sourceRequest;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 2b81870..f4fb9bb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -61,6 +61,13 @@ public class KafkaSourceDTO {
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
private String topicPartitionOffset;
+ /**
+ * @see <a href="https://docs.confluent.io/platform/current/clients/consumer.html">Kafka_consumer_config</a>
+ */
+ @ApiModelProperty(value = "The strategy of auto offset reset",
+ notes = "including earliest, latest (the default), none")
+ private String autoOffsetReset;
+
@ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
private String serializationType;
@@ -75,6 +82,7 @@ public class KafkaSourceDTO {
.recordSpeedLimit(request.getRecordSpeedLimit())
.byteSpeedLimit(request.getByteSpeedLimit())
.topicPartitionOffset(request.getTopicPartitionOffset())
+ .autoOffsetReset(request.getAutoOffsetReset())
.serializationType(request.getSerializationType())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 318eabb..9190bd0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -52,4 +52,9 @@ public class KafkaSourceListResponse extends SourceListResponse {
@ApiModelProperty(value = "Topic partition offset",
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
private String topicPartitionOffset;
+
+ @ApiModelProperty(value = "The strategy of auto offset reset",
+ notes = "including earliest, latest (the default), none")
+ private String autoOffsetReset;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 7485cfd..84110d1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -37,10 +37,6 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = Constant.SOURCE_KAFKA)
public class KafkaSourceRequest extends SourceRequest {
- public KafkaSourceRequest() {
- this.setSourceType(SourceType.KAFKA.toString());
- }
-
@ApiModelProperty("Kafka topic")
private String topic;
@@ -62,4 +58,12 @@ public class KafkaSourceRequest extends SourceRequest {
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
private String topicPartitionOffset;
+ @ApiModelProperty(value = "The strategy of auto offset reset",
+ notes = "including earliest, latest (the default), none")
+ private String autoOffsetReset;
+
+ public KafkaSourceRequest() {
+ this.setSourceType(SourceType.KAFKA.toString());
+ }
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 7879032..34b178b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -34,10 +34,6 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@ApiModel(value = "Response of the kafka source")
public class KafkaSourceResponse extends SourceResponse {
- public KafkaSourceResponse() {
- this.setSourceType(SourceType.KAFKA.name());
- }
-
@ApiModelProperty("Kafka topic")
private String topic;
@@ -56,4 +52,11 @@ public class KafkaSourceResponse extends SourceResponse {
@ApiModelProperty("Topic partition offset")
private String topicPartitionOffset;
+ @ApiModelProperty(value = "The strategy of auto offset reset")
+ private String autoOffsetReset;
+
+ public KafkaSourceResponse() {
+ this.setSourceType(SourceType.KAFKA.name());
+ }
+
}