You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/07/16 06:55:10 UTC

[GitHub] [inlong] huyuanfeng2018 opened a new pull request, #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

huyuanfeng2018 opened a new pull request, #5079:
URL: https://github.com/apache/inlong/pull/5079

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - [INLONG-5074][Sort] KafkaExtractNode support more StartupMode
   
   - Fixes #5074 
   
   ### Motivation
   
   KafkaExtractNode supports consumption from specified partitions and offsets
   
   ### Verifying this change
   
   - [x] This change added tests and can be verified as follows:
     - *org.apache.inlong.sort.parser.KafkaSqlParseTest*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] huyuanfeng2018 commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
huyuanfeng2018 commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922774790


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java:
##########
@@ -25,7 +25,8 @@
  */
 public enum KafkaScanStartupMode {
     EARLIEST_OFFSET("earliest-offset"),
-    LATEST_OFFSET("latest-offset");
+    LATEST_OFFSET("latest-offset"),
+    SPECIFIC_OFFSET("specific-offsets");

Review Comment:
   The API provided by the Flink community connector looks like this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922930847


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java:
##########
@@ -25,7 +25,8 @@
  */
 public enum KafkaScanStartupMode {
     EARLIEST_OFFSET("earliest-offset"),
-    LATEST_OFFSET("latest-offset");
+    LATEST_OFFSET("latest-offset"),
+    SPECIFIC_OFFSET("specific-offsets");

Review Comment:
   Maybe calling it `SPECIFIC_OFFSETS` is more explicit.



##########
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSource.java:
##########
@@ -58,11 +58,12 @@ public class KafkaSource extends StreamSource {
     private String byteSpeedLimit;
 
     @ApiModelProperty(value = "Topic partition offset",
-            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
-    private String topicPartitionOffset;
+            notes = "For example,'partition:0,offset:42;partition:1,offset:300' "
+                    + "indicates offset 42 for partition 0 and offset 300 for partition 1.")
+    private String partitionOffset;

Review Comment:
   Please change this param in `KafkaSourceDTO` and `KafkaSourceRequest` at the same time.



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java:
##########
@@ -35,6 +35,8 @@ public class KafkaConstant {
 
     public static final String KAFKA = "kafka";
 
+    public static final String SCAN_STARTUP_SPECIFIC_OFFSET = "scan.startup.specific-offsets";

Review Comment:
   Maybe calling it `SCAN_STARTUP_SPECIFIC_OFFSETS` is more explicit.



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java:
##########
@@ -75,6 +75,9 @@ public class KafkaExtractNode extends ExtractNode implements Metadata, Serializa
     @JsonProperty("groupId")
     private String groupId;
 
+    @JsonProperty("kafkaScanSpecificOffset")
+    private String kafkaScanSpecificOffset;

Review Comment:
   This is a Kafka extract node, so just calling it `scanSpecificOffset` is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] huyuanfeng2018 commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
huyuanfeng2018 commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922984430


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java:
##########
@@ -75,6 +75,9 @@ public class KafkaExtractNode extends ExtractNode implements Metadata, Serializa
     @JsonProperty("groupId")
     private String groupId;
 
+    @JsonProperty("kafkaScanSpecificOffset")
+    private String kafkaScanSpecificOffset;

Review Comment:
   OK, thanks for your advice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
healchow commented on PR #5079:
URL: https://github.com/apache/inlong/pull/5079#issuecomment-1186216908

   @huyuanfeng2018 Hello, there are some conflicts, and the `KafkaSqlParseTest` has no Apache License header, please take a look, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922690233


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java:
##########
@@ -25,7 +25,8 @@
  */
 public enum KafkaScanStartupMode {
     EARLIEST_OFFSET("earliest-offset"),
-    LATEST_OFFSET("latest-offset");
+    LATEST_OFFSET("latest-offset"),
+    SPECIFIC_OFFSET("specific-offsets");

Review Comment:
   Why does this use `offsets` not `offset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
healchow commented on PR #5079:
URL: https://github.com/apache/inlong/pull/5079#issuecomment-1186215720

   @yunqingmoswu @EMsnap PTAL, tks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on PR #5079:
URL: https://github.com/apache/inlong/pull/5079#issuecomment-1186456973

   @gong @EMsnap PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] huyuanfeng2018 commented on pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
huyuanfeng2018 commented on PR #5079:
URL: https://github.com/apache/inlong/pull/5079#issuecomment-1186412420

   
   
   > @huyuanfeng2018 Hello, there are some conflicts, and the `KafkaSqlParseTest` has no Apache License header, please take a look, thanks.
   
   Hi, I have resolved the conflict and added the Apache Licence header to the test class, please have a look again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] huyuanfeng2018 commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
huyuanfeng2018 commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922988164


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java:
##########
@@ -86,14 +89,19 @@ public KafkaExtractNode(@JsonProperty("id") String id,
             @Nonnull @JsonProperty("format") Format format,
             @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
             @JsonProperty("primaryKey") String primaryKey,
-            @JsonProperty("groupId") String groupId) {
+            @JsonProperty("groupId") String groupId,
+            @JsonProperty("kafkaScanSpecificOffset") String kafkaScanSpecificOffset) {
         super(id, name, fields, watermarkField, properties);
         this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
         this.format = Preconditions.checkNotNull(format, "kafka format is empty");
         this.kafkaScanStartupMode = Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty");
         this.primaryKey = primaryKey;
         this.groupId = groupId;
+        if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSET) {
+            this.kafkaScanSpecificOffset
+                    = Preconditions.checkNotNull(kafkaScanSpecificOffset, "kafkaScanSpecificOffset is empty");

Review Comment:
   
   Preconditions currently use the Guava library. There is no checknotEmpty method, so I can only change it like this:
   ```
   Preconditions.checkArgument(StringUtils.isNotEmpty(scanSpecificOffsets), "scanSpecificOffsets is empty");
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922933408


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java:
##########
@@ -86,14 +89,19 @@ public KafkaExtractNode(@JsonProperty("id") String id,
             @Nonnull @JsonProperty("format") Format format,
             @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
             @JsonProperty("primaryKey") String primaryKey,
-            @JsonProperty("groupId") String groupId) {
+            @JsonProperty("groupId") String groupId,
+            @JsonProperty("kafkaScanSpecificOffset") String kafkaScanSpecificOffset) {
         super(id, name, fields, watermarkField, properties);
         this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
         this.format = Preconditions.checkNotNull(format, "kafka format is empty");
         this.kafkaScanStartupMode = Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty");
         this.primaryKey = primaryKey;
         this.groupId = groupId;
+        if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSET) {
+            this.kafkaScanSpecificOffset
+                    = Preconditions.checkNotNull(kafkaScanSpecificOffset, "kafkaScanSpecificOffset is empty");

Review Comment:
   checknotEmpty is better when the parameter is string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922693187


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java:
##########
@@ -25,7 +25,8 @@
  */
 public enum KafkaScanStartupMode {
     EARLIEST_OFFSET("earliest-offset"),
-    LATEST_OFFSET("latest-offset");
+    LATEST_OFFSET("latest-offset"),
+    SPECIFIC_OFFSET("specific-offsets");

Review Comment:
   it is controlled by the sort connector.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
EMsnap merged PR #5079:
URL: https://github.com/apache/inlong/pull/5079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on pull request #5079: [INLONG-5074][Sort] KafkaExtractNode support more StartupMode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on PR #5079:
URL: https://github.com/apache/inlong/pull/5079#issuecomment-1186222786

   > @yunqingmoswu @EMsnap PTAL, tks.
   
   It is all right after handling the comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org