You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/14 13:23:18 UTC
[flink] 01/02: [FLINK-25044][pulsar][test]: fix the messsageId overflow when set to latest
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4b9f06375e28a5107d4b0108e7478c574bf05cd
Author: Yufei Zhang <af...@gmail.com>
AuthorDate: Tue Dec 7 20:41:36 2021 +0800
[FLINK-25044][pulsar][test]: fix the messsageId overflow when set to latest
---
.../source/enumerator/cursor/start/MessageIdStartCursor.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index 0185bb3..f807960 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -53,9 +53,13 @@ public class MessageIdStartCursor implements StartCursor {
messageId instanceof MessageIdImpl,
"We only support normal message id and batch message id.");
MessageIdImpl id = (MessageIdImpl) messageId;
- this.messageId =
- new MessageIdImpl(
- id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
+ if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
+ this.messageId = messageId;
+ } else {
+ this.messageId =
+ new MessageIdImpl(
+ id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
+ }
}
}