You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/14 13:23:18 UTC

[flink] 01/02: [FLINK-25044][pulsar][test]: fix the messsageId overflow when set to latest

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4b9f06375e28a5107d4b0108e7478c574bf05cd
Author: Yufei Zhang <af...@gmail.com>
AuthorDate: Tue Dec 7 20:41:36 2021 +0800

    [FLINK-25044][pulsar][test]: fix the messsageId overflow when set to latest
---
 .../source/enumerator/cursor/start/MessageIdStartCursor.java   | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index 0185bb3..f807960 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -53,9 +53,13 @@ public class MessageIdStartCursor implements StartCursor {
                     messageId instanceof MessageIdImpl,
                     "We only support normal message id and batch message id.");
             MessageIdImpl id = (MessageIdImpl) messageId;
-            this.messageId =
-                    new MessageIdImpl(
-                            id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
+            if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
+                this.messageId = messageId;
+            } else {
+                this.messageId =
+                        new MessageIdImpl(
+                                id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
+            }
         }
     }