You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/21 07:28:41 UTC

[GitHub] [incubator-seatunnel] Carl-Zhou-CN opened a new pull request, #3157: Support setting read starting offset or time at startup config

Carl-Zhou-CN opened a new pull request, #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   
   close #2959 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#discussion_r1005471664


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java:
##########
@@ -91,20 +122,20 @@ public void close() throws IOException {
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(convertToNextSplit(splits));
+            pendingSplit.putAll(convertToNextSplit(splits));
             assignSplit();
         }
     }
 
-    private Collection<? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
+    private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
         try {
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets =
-                getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+            Map<TopicPartition, Long> listOffsets =
+                listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.earliest());

Review Comment:
   Why use `OffsetSpec.earliest()` not latest?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Carl-Zhou-CN commented on pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Carl-Zhou-CN commented on PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#issuecomment-1291781148

   @Hisoka-X @TyrantLucifer   Please help to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#discussion_r1010060457


##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -66,6 +69,24 @@ The structure of the data, including field names and field types.
 Data format. The default format is json. Optional text format. The default field separator is ", ".
 If you customize the delimiter, add the "field_delimiter" option.
 
+## start_mode
+The initial consumption pattern of consumers,there are several types:
+[earliest],[group_offsets],[latest],[specific_offsets],[timestamp]
+
+## start_mode.timestamp
+The time required for consumption mode to be timestamp
+
+##  start_mode.offsets
+The offset required for consumption mode to be specific_offsets
+for example:
+```hocon
+   start_mode.offsets = {
+            info-0 = 70
+            info-1 = 10
+            info-2 = 10
+         }
+```
+

Review Comment:
   Please add `changed log` reference https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/Redis.md#next-version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 closed pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
EricJoy2048 closed pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config
URL: https://github.com/apache/incubator-seatunnel/pull/3157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Carl-Zhou-CN commented on a diff in pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#discussion_r1005492534


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java:
##########
@@ -91,20 +122,20 @@ public void close() throws IOException {
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(convertToNextSplit(splits));
+            pendingSplit.putAll(convertToNextSplit(splits));
             assignSplit();
         }
     }
 
-    private Collection<? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
+    private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
         try {
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets =
-                getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+            Map<TopicPartition, Long> listOffsets =
+                listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.earliest());

Review Comment:
   Sorry, it was my mistake. I should have used latest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Carl-Zhou-CN commented on pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Carl-Zhou-CN commented on PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#issuecomment-1286842748

   > please refer to [Coding Guide](https://seatunnel.apache.org/docs/contribution/coding-guide#how-to-submit-a-high-quality-pull-request) and change your pull request titile. Thx.
   
   ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#discussion_r1005498384


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java:
##########
@@ -91,20 +122,20 @@ public void close() throws IOException {
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(convertToNextSplit(splits));
+            pendingSplit.putAll(convertToNextSplit(splits));
             assignSplit();
         }
     }
 
-    private Collection<? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
+    private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
         try {
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets =
-                getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+            Map<TopicPartition, Long> listOffsets =
+                listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.earliest());

Review Comment:
   Maybe you should add some test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Carl-Zhou-CN commented on a diff in pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#discussion_r1005540004


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java:
##########
@@ -91,20 +122,20 @@ public void close() throws IOException {
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(convertToNextSplit(splits));
+            pendingSplit.putAll(convertToNextSplit(splits));
             assignSplit();
         }
     }
 
-    private Collection<? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
+    private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
         try {
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets =
-                getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+            Map<TopicPartition, Long> listOffsets =
+                listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.earliest());

Review Comment:
   Ok, I will add e2e tests later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #3157: Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#issuecomment-1286834878

   please refer to [Coding Guide](https://seatunnel.apache.org/docs/contribution/coding-guide#how-to-submit-a-high-quality-pull-request) and change your pull request titile. Thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Carl-Zhou-CN commented on a diff in pull request #3157: [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config

Posted by GitBox <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #3157:
URL: https://github.com/apache/incubator-seatunnel/pull/3157#discussion_r1017277380


##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -66,6 +69,24 @@ The structure of the data, including field names and field types.
 Data format. The default format is json. Optional text format. The default field separator is ", ".
 If you customize the delimiter, add the "field_delimiter" option.
 
+## start_mode
+The initial consumption pattern of consumers,there are several types:
+[earliest],[group_offsets],[latest],[specific_offsets],[timestamp]
+
+## start_mode.timestamp
+The time required for consumption mode to be timestamp
+
+##  start_mode.offsets
+The offset required for consumption mode to be specific_offsets
+for example:
+```hocon
+   start_mode.offsets = {
+            info-0 = 70
+            info-1 = 10
+            info-2 = 10
+         }
+```
+

Review Comment:
   okļ¼Œthank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org