You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/21 06:35:05 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6b17e995875 [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
6b17e995875 is described below
commit 6b17e9958752c7bffc26d1a8522f7959d035d550
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Mon Dec 19 23:59:35 2022 -0600
[fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads.
Here is the single `BaseCommand`:
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99
Here is the method call that resets the object:
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114
Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object.
This PR copies relevant values or objects into other variables.
* Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread.
* Move logic to copy certain command fields to earlier in method for `handleSubscribe`
* Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured.
This is a trivial change that is already covered by tests.
- [x] `doc-not-needed`
This is an internal change.
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8
(cherry picked from commit a408e9e392d48dcda7c17cd9b9e85e530c94998d)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 14 +++++---------
1 file changed, 5 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d18e154f379..9e04dd9ccde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -967,6 +967,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
+ final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
+ final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
+ subscribe.getSubscriptionPropertiesList());
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName,
@@ -1029,14 +1032,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());
- final long consumerEpoch;
- if (subscribe.hasConsumerEpoch()) {
- consumerEpoch = subscribe.getConsumerEpoch();
- } else {
- consumerEpoch = DEFAULT_CONSUMER_EPOCH;
- }
- Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
- subscribe.getSubscriptionPropertiesList());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
@@ -1552,6 +1547,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
+ final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
@@ -1561,7 +1557,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
requestId, null, null, consumerId));
}
if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+ getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
}
}).exceptionally(e -> {
if (hasRequestId) {