You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/03/22 00:23:41 UTC
[pulsar] branch master updated: Do not retry on authorization
failure (#6577)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6cb0d25 Do not retry on authorization failure (#6577)
6cb0d25 is described below
commit 6cb0d25aea861d6e4d927c483cf838330887be7a
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sat Mar 21 17:23:25 2020 -0700
Do not retry on authorization failure (#6577)
* Do not retry on authorization failure
* Address feedback
* Fix logic
* Fix test
* Fixed more tests
* Fixed more test
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../apache/pulsar/client/api/ClientErrorsTest.java | 10 +++++-----
.../websocket/proxy/ProxyPublishConsumeTest.java | 4 ++--
.../pulsar/client/api/PulsarClientException.java | 19 +++++++++++++++++++
.../apache/pulsar/client/impl/ConnectionHandler.java | 4 ----
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../org/apache/pulsar/client/impl/ProducerImpl.java | 2 +-
.../apache/pulsar/client/impl/PulsarClientImpl.java | 2 +-
7 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index cfb7c02..7751342 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -208,7 +208,7 @@ public class ClientErrorsTest {
mockBrokerService.setHandleProducer((ctx, producer) -> {
if (counter.incrementAndGet() == 2) {
// fail second producer
- ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg"));
+ ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
@@ -436,7 +436,7 @@ public class ClientErrorsTest {
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
if (counter.incrementAndGet() == 2) {
// fail second producer
- ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg"));
+ ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
@@ -507,7 +507,7 @@ public class ClientErrorsTest {
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
System.err.println("subscribeCounter: " + subscribeCounter.get());
if (subscribeCounter.incrementAndGet() == 3) {
- ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg"));
+ ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
@@ -520,8 +520,8 @@ public class ClientErrorsTest {
try {
client.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe();
- fail("Should have failed with an authentication error");
- } catch (PulsarClientException.AuthenticationException e) {
+ fail("Should have failed with an authorization error");
+ } catch (PulsarClientException.AuthorizationException e) {
}
// should call close for 3 partitions
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 009596f..d33ed58 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -208,7 +208,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
- @Test(timeOut = 10000)
+ @Test(timeOut = 1000000)
public void conflictingConsumerTest() throws Exception {
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
@@ -244,7 +244,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
- @Test(timeOut = 10000)
+ @Test(timeOut = 100000)
public void conflictingProducerTest() throws Exception {
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index f751cc2..8703caa 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -719,4 +719,23 @@ public class PulsarClientException extends IOException {
return new PulsarClientException(t);
}
}
+
+ public static boolean isRetriableError(Throwable t) {
+ if (t instanceof TooManyRequestsException
+ || t instanceof AuthorizationException
+ || t instanceof InvalidServiceURL
+ || t instanceof InvalidConfigurationException
+ || t instanceof NotFoundException
+ || t instanceof IncompatibleSchemaException
+ || t instanceof TopicDoesNotExistException
+ || t instanceof UnsupportedAuthenticationException
+ || t instanceof InvalidMessageException
+ || t instanceof InvalidTopicNameException
+ || t instanceof NotSupportedException
+ || t instanceof ChecksumException
+ || t instanceof CryptoException) {
+ return false;
+ }
+ return true;
+ }
}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 8eab9ab..0df0aae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -130,10 +130,6 @@ public class ConnectionHandler {
return CLIENT_CNX_UPDATER.get(this);
}
- protected boolean isRetriableError(PulsarClientException e) {
- return e instanceof PulsarClientException.LookupException;
- }
-
protected void setClientCnx(ClientCnx clientCnx) {
CLIENT_CNX_UPDATER.set(this, clientCnx);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 14d0aeb..5312675 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -628,7 +628,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
if (e.getCause() instanceof PulsarClientException
- && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
+ && PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() < subscribeTimeout) {
reconnectLater(e.getCause());
} else if (!subscribeFuture.isDone()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index fc16fe8..989a284 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1178,7 +1178,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
producerCreatedFuture.completeExceptionally(cause);
client.cleanupProducer(this);
} else if (producerCreatedFuture.isDone() || //
- (cause instanceof PulsarClientException && connectionHandler.isRetriableError((PulsarClientException) cause)
+ (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause)
&& System.currentTimeMillis() < createProducerTimeout)) {
// Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
// still within the initial timeout budget and we are dealing with a retriable error
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 27a158d..31a4ed0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -679,7 +679,7 @@ public class PulsarClientImpl implements PulsarClient {
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
// skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException`
- boolean isLookupThrottling = e.getCause() instanceof PulsarClientException.TooManyRequestsException;
+ boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause());
if (nextDelay <= 0 || isLookupThrottling) {
future.completeExceptionally(e);
return null;