You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/06 17:56:30 UTC
[pulsar] branch branch-2.8 updated: [fix][broker] Fix RawReader out of order (#16390)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new b7f66667434 [fix][broker] Fix RawReader out of order (#16390)
b7f66667434 is described below
commit b7f666674348079d6737f9e67529867eaa92f101
Author: Hang Chen <ch...@apache.org>
AuthorDate: Thu Jul 7 01:54:22 2022 +0800
[fix][broker] Fix RawReader out of order (#16390)
* fix RawReader out ouf order
* address comments
* tune code
---
.../src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 7c75f099773..bf9fbec59f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -210,9 +210,10 @@ public class RawReaderImpl implements RawReader {
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription,
messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition());
}
+
incomingRawMessages.add(
- new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
- tryCompletePending();
+ new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
+ internalPinnedExecutor.execute(this::tryCompletePending);
}
}