You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/16 13:40:55 UTC

[pulsar] branch master updated: Cancel retry if the error is autheticate failed (#8058)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf96619  Cancel retry if the error is autheticate failed (#8058)
bf96619 is described below

commit bf96619ae96655a27baff15551305fa7cbcb2835
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Sep 16 21:39:56 2020 +0800

    Cancel retry if the error is autheticate failed (#8058)
    
    ---
    
    Fixes #7929
    
    *Motivation*
    
    We shouldn't retry to connect to the server if the client has authenticate error.
---
 .../main/java/org/apache/pulsar/client/impl/ClientCnx.java |  4 ++++
 .../org/apache/pulsar/client/impl/PulsarClientImpl.java    |  4 +++-
 .../auth/token/PulsarTokenAuthenticationBaseSuite.java     | 14 ++++++++++++++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index b32dc99..6f8c511 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -607,6 +607,10 @@ public class ClientCnx extends PulsarHandler {
             log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic",
                     ctx.channel());
         }
+        if (error.getError() == ServerError.AuthenticationError) {
+            connectionFuture.completeExceptionally(new PulsarClientException.AuthenticationException(error.getMessage()));
+            log.error("{} Failed to authenticate the client", ctx.channel());
+        }
         CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage()));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6c4dced..c33eb9b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -704,7 +704,9 @@ public class PulsarClientImpl implements PulsarClient {
         lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             // skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException`
-            boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause()) || e.getCause() instanceof PulsarClientException.TooManyRequestsException;
+            boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause())
+                || e.getCause() instanceof PulsarClientException.TooManyRequestsException
+                || e.getCause() instanceof PulsarClientException.AuthenticationException;
             if (nextDelay <= 0 || isLookupThrottling) {
                 future.completeExceptionally(e);
                 return null;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index ad0fa83..7a923ee 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -329,4 +329,18 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
             }
         }
     }
+
+    @Test
+    public void testAuthenticationFailedImmediately() throws PulsarClientException {
+        try {
+            @Cleanup
+            PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .authentication(AuthenticationFactory.token("invalid_token"))
+                .build();
+            client.newProducer().topic("test_token_topic" + randomName(4));
+        } catch (PulsarClientException.AuthenticationException pae) {
+            // expected error
+        }
+    }
 }