You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/10/09 03:09:20 UTC

[GitHub] [rocketmq-flink] SteNicholas commented on a diff in pull request #46: [ISSUE #35]Use litePullConsumer model instead of default p…

SteNicholas commented on code in PR #46:
URL: https://github.com/apache/rocketmq-flink/pull/46#discussion_r990724567


##########
src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java:
##########
@@ -75,6 +70,8 @@
     private final long startTime;
     private final long startOffset;
 
+    private final int pollTime;

Review Comment:
   ```suggestion
       private final long pollTime;
   ```



##########
src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java:
##########
@@ -404,8 +380,7 @@ private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
     private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
         offsetTable.put(mq, offset);
         if (!enableCheckpoint) {
-            consumer.updateConsumeOffset(mq, offset);
-            consumer.getOffsetStore().persist(consumer.queueWithNamespace(mq));
+            consumer.getOffsetStore().updateOffset(mq, offset, false);

Review Comment:
   Why not persist the offset? IMO, the offset should be persisted here. cc @ShannonDing @zhouxinyu 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org