You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/03/22 00:23:41 UTC

[pulsar] branch master updated: Do not retry on authorization failure (#6577)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cb0d25  Do not retry on authorization failure (#6577)
6cb0d25 is described below

commit 6cb0d25aea861d6e4d927c483cf838330887be7a
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sat Mar 21 17:23:25 2020 -0700

    Do not retry on authorization failure (#6577)
    
    * Do not retry on authorization failure
    
    * Address feedback
    
    * Fix logic
    
    * Fix test
    
    * Fixed more tests
    
    * Fixed more test
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../apache/pulsar/client/api/ClientErrorsTest.java    | 10 +++++-----
 .../websocket/proxy/ProxyPublishConsumeTest.java      |  4 ++--
 .../pulsar/client/api/PulsarClientException.java      | 19 +++++++++++++++++++
 .../apache/pulsar/client/impl/ConnectionHandler.java  |  4 ----
 .../org/apache/pulsar/client/impl/ConsumerImpl.java   |  2 +-
 .../org/apache/pulsar/client/impl/ProducerImpl.java   |  2 +-
 .../apache/pulsar/client/impl/PulsarClientImpl.java   |  2 +-
 7 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index cfb7c02..7751342 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -208,7 +208,7 @@ public class ClientErrorsTest {
         mockBrokerService.setHandleProducer((ctx, producer) -> {
             if (counter.incrementAndGet() == 2) {
                 // fail second producer
-                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg"));
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
                 return;
             }
             ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
@@ -436,7 +436,7 @@ public class ClientErrorsTest {
         mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
             if (counter.incrementAndGet() == 2) {
                 // fail second producer
-                ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg"));
+                ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg"));
                 return;
             }
             ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
@@ -507,7 +507,7 @@ public class ClientErrorsTest {
         mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
             System.err.println("subscribeCounter: " + subscribeCounter.get());
             if (subscribeCounter.incrementAndGet() == 3) {
-                ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg"));
+                ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg"));
                 return;
             }
             ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
@@ -520,8 +520,8 @@ public class ClientErrorsTest {
 
         try {
             client.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe();
-            fail("Should have failed with an authentication error");
-        } catch (PulsarClientException.AuthenticationException e) {
+            fail("Should have failed with an authorization error");
+        } catch (PulsarClientException.AuthorizationException e) {
         }
 
         // should call close for 3 partitions
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 009596f..d33ed58 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -208,7 +208,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = 10000)
+    @Test(timeOut = 1000000)
     public void conflictingConsumerTest() throws Exception {
         final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
                 + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
@@ -244,7 +244,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = 10000)
+    @Test(timeOut = 100000)
     public void conflictingProducerTest() throws Exception {
         final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
                 + "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index f751cc2..8703caa 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -719,4 +719,23 @@ public class PulsarClientException extends IOException {
             return new PulsarClientException(t);
         }
     }
+
+    public static boolean isRetriableError(Throwable t) {
+        if (t instanceof TooManyRequestsException
+                || t instanceof AuthorizationException
+                || t instanceof InvalidServiceURL
+                || t instanceof InvalidConfigurationException
+                || t instanceof NotFoundException
+                || t instanceof IncompatibleSchemaException
+                || t instanceof TopicDoesNotExistException
+                || t instanceof UnsupportedAuthenticationException
+                || t instanceof InvalidMessageException
+                || t instanceof InvalidTopicNameException
+                || t instanceof NotSupportedException
+                || t instanceof ChecksumException
+                || t instanceof CryptoException) {
+            return false;
+        }
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 8eab9ab..0df0aae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -130,10 +130,6 @@ public class ConnectionHandler {
         return CLIENT_CNX_UPDATER.get(this);
     }
 
-    protected boolean isRetriableError(PulsarClientException e) {
-        return e instanceof PulsarClientException.LookupException;
-    }
-
     protected void setClientCnx(ClientCnx clientCnx) {
         CLIENT_CNX_UPDATER.set(this, clientCnx);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 14d0aeb..5312675 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -628,7 +628,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
 
             if (e.getCause() instanceof PulsarClientException
-                    && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
+                    && PulsarClientException.isRetriableError(e.getCause())
                     && System.currentTimeMillis() < subscribeTimeout) {
                 reconnectLater(e.getCause());
             } else if (!subscribeFuture.isDone()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index fc16fe8..989a284 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1178,7 +1178,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                         producerCreatedFuture.completeExceptionally(cause);
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
-                    (cause instanceof PulsarClientException && connectionHandler.isRetriableError((PulsarClientException) cause)
+                    (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause)
                             && System.currentTimeMillis() < createProducerTimeout)) {
                         // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
                         // still within the initial timeout budget and we are dealing with a retriable error
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 27a158d..31a4ed0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -679,7 +679,7 @@ public class PulsarClientImpl implements PulsarClient {
         lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             // skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException`
-            boolean isLookupThrottling = e.getCause() instanceof PulsarClientException.TooManyRequestsException;
+            boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause());
             if (nextDelay <= 0 || isLookupThrottling) {
                 future.completeExceptionally(e);
                 return null;