You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/10/09 11:00:10 UTC

[GitHub] [rocketmq-flink] deemogsw commented on a diff in pull request #62: [Feature] Support rich initialization modes of RocketMQ source

deemogsw commented on code in PR #62:
URL: https://github.com/apache/rocketmq-flink/pull/62#discussion_r990773071


##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class `org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` |
 | consumer.offset.persist.interval | auto commit offset interval      |    5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 |
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were not found      |    10 |
 
+### Consumer From Where
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                            | DESCRIPTION                                                  |
+| ----------------------------------- | ------------------------------------------------------------ |
+| StartFromEarliest                   | consume from the earliest offset after restart with no state |

Review Comment:
   > Thanks for @deemogsw the feature contribution. I left some comments for the feature support of `RocketMQSourceFunction`. Could you please also support this feature for the `RocketMQSource`?
   > 
   > BTW, could you please take a look at the failure of the CI?
   
   Thanks for comments.
   I'm glad to perfect those feature for new source interface.I will open a new issue for RocketMQSource.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org