You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/21 06:34:23 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new af3e1c24f2d [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
af3e1c24f2d is described below

commit af3e1c24f2d500b57a68be63c5d4f475477eba57
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Mon Dec 19 23:59:35 2022 -0600

    [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987)
    
    In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads.
    
    Here is the single `BaseCommand`:
    
    https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99
    
    Here is the method call that resets the object:
    
    https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114
    
    Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object.
    
    This PR copies relevant values or objects into other variables.
    
    * Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread.
    * Move logic to copy certain command fields to earlier in method for `handleSubscribe`
    * Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured.
    
    This is a trivial change that is already covered by tests.
    
    - [x] `doc-not-needed`
    
    This is an internal change.
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8
    
    (cherry picked from commit a408e9e392d48dcda7c17cd9b9e85e530c94998d)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index af0b1ca17e1..c566eab93a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1015,6 +1015,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
               ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
               : emptyKeySharedMeta;
+        final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
+        final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
+                subscribe.getSubscriptionPropertiesList());
 
         if (log.isDebugEnabled()) {
             log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName,
@@ -1082,14 +1085,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 boolean createTopicIfDoesNotExist = forceTopicCreation
                         && service.isAllowAutoTopicCreation(topicName.toString());
 
-                final long consumerEpoch;
-                if (subscribe.hasConsumerEpoch()) {
-                    consumerEpoch = subscribe.getConsumerEpoch();
-                } else {
-                    consumerEpoch = DEFAULT_CONSUMER_EPOCH;
-                }
-                Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
-                        subscribe.getSubscriptionPropertiesList());
                 service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                         .thenCompose(optTopic -> {
                             if (!optTopic.isPresent()) {
@@ -1612,6 +1607,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final boolean hasRequestId = ack.hasRequestId();
         final long requestId = hasRequestId ? ack.getRequestId() : 0;
         final long consumerId = ack.getConsumerId();
+        final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
@@ -1621,7 +1617,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                     requestId, null, null, consumerId));
                         }
                         if (getBrokerService().getInterceptor() != null) {
-                            getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+                            getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
                         }
                     }).exceptionally(e -> {
                         if (hasRequestId) {
@@ -2282,7 +2278,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     if (log.isDebugEnabled()) {
                         log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
                     }
-                    commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId());
+                    commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId());
                 } else {
                     if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) {
                         // if new txn throw ReachMaxActiveTxnException, don't return any response to client,