You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/21 06:35:05 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 6b17e995875 [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
6b17e995875 is described below

commit 6b17e9958752c7bffc26d1a8522f7959d035d550
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Mon Dec 19 23:59:35 2022 -0600

    [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
    
    In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads.
    
    Here is the single `BaseCommand`:
    
    https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99
    
    Here is the method call that resets the object:
    
    https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114
    
    Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object.
    
    This PR copies relevant values or objects into other variables.
    
    * Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread.
    * Move logic to copy certain command fields to earlier in method for `handleSubscribe`
    * Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured.
    
    This is a trivial change that is already covered by tests.
    
    - [x] `doc-not-needed`
    
    This is an internal change.
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8
    
    (cherry picked from commit a408e9e392d48dcda7c17cd9b9e85e530c94998d)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java   | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d18e154f379..9e04dd9ccde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -967,6 +967,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
               ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
               : emptyKeySharedMeta;
+        final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
+        final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
+                subscribe.getSubscriptionPropertiesList());
 
         CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
                 topicName,
@@ -1029,14 +1032,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 boolean createTopicIfDoesNotExist = forceTopicCreation
                         && service.isAllowAutoTopicCreation(topicName.toString());
 
-                final long consumerEpoch;
-                if (subscribe.hasConsumerEpoch()) {
-                    consumerEpoch = subscribe.getConsumerEpoch();
-                } else {
-                    consumerEpoch = DEFAULT_CONSUMER_EPOCH;
-                }
-                Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
-                        subscribe.getSubscriptionPropertiesList());
                 service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                         .thenCompose(optTopic -> {
                             if (!optTopic.isPresent()) {
@@ -1552,6 +1547,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final boolean hasRequestId = ack.hasRequestId();
         final long requestId = hasRequestId ? ack.getRequestId() : 0;
         final long consumerId = ack.getConsumerId();
+        final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
@@ -1561,7 +1557,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                     requestId, null, null, consumerId));
                         }
                         if (getBrokerService().getInterceptor() != null) {
-                            getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+                            getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
                         }
                     }).exceptionally(e -> {
                         if (hasRequestId) {