You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/02/21 04:05:10 UTC

[incubator-pulsar] branch branch-1.22 updated: Fix in ServerCnx to prevent using recycled commands (#1264)

This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
     new 5d14788  Fix in ServerCnx to prevent using recycled commands (#1264)
5d14788 is described below

commit 5d14788e510faec23fd8ed189ed343e93b489dda
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Tue Feb 20 20:04:20 2018 -0800

    Fix in ServerCnx to prevent using recycled commands (#1264)
---
 .../org/apache/pulsar/broker/service/ServerCnx.java  | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bdcc6e3..85cedde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -209,7 +209,7 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
-
+        final boolean authoritative = lookup.getAuthoritative();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
         }
@@ -254,9 +254,9 @@ public class ServerCnx extends PulsarHandler {
             String finalOriginalPrincipal = originalPrincipal;
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                 if (isProxyAuthorized) {
-                    lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(),
+                    lookupDestinationAsync(getBrokerService().pulsar(), topicName, authoritative,
                             finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
-                            lookup.getRequestId()).handle((lookupResponse, ex) -> {
+                            requestId).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
                                 } else {
@@ -550,7 +550,7 @@ public class ServerCnx extends PulsarHandler {
                 subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
                 subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
                 : null;
-
+        final String subscription = subscribe.getSubscription();
         final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
@@ -568,7 +568,7 @@ public class ServerCnx extends PulsarHandler {
                 if (service.isAuthorizationEnabled()) {
                     authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
                             originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
-                            subscribe.getSubscription());
+                            subscription);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
@@ -995,13 +995,13 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleSeek(CommandSeek seek) {
         checkArgument(state == State.Connected);
-
+        final long requestId = seek.getRequestId();
         CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId());
 
         // Currently only seeking on a message id is supported
         if (!seek.hasMessageId()) {
             ctx.writeAndFlush(
-                    Commands.newError(seek.getRequestId(), ServerError.MetadataError, "Message id was not present"));
+                    Commands.newError(requestId, ServerError.MetadataError, "Message id was not present"));
             return;
         }
 
@@ -1011,7 +1011,7 @@ public class ServerCnx extends PulsarHandler {
             MessageIdData msgIdData = seek.getMessageId();
 
             Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
-            long requestId = seek.getRequestId();
+            
 
             subscription.resetCursor(position).thenRun(() -> {
                 log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
@@ -1019,12 +1019,12 @@ public class ServerCnx extends PulsarHandler {
                 ctx.writeAndFlush(Commands.newSuccess(requestId));
             }).exceptionally(ex -> {
                 log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex);
-                ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.UnknownError,
+                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
                         "Error when resetting subscription: " + ex.getCause().getMessage()));
                 return null;
             });
         } else {
-            ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.MetadataError, "Consumer not found"));
+            ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer not found"));
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
jai1@apache.org.