You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:09 UTC
[pulsar] 13/14: Handle NotAllowed Exception at the client side.
(#7430)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 754b864cf5f8844881eb9d47f4eaba6b4fb6d5c2
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jul 17 09:22:40 2020 +0800
Handle NotAllowed Exception at the client side. (#7430)
* Handle NotAllowed Exception at the client side.
(cherry picked from commit f8b2a2334fb7d2dc5266242a6393c9cc434fba60)
---
.../broker/service/BrokerServiceException.java | 2 ++
.../client/api/KeySharedSubscriptionTest.java | 2 +-
.../pulsar/client/api/PulsarClientException.java | 22 ++++++++++++++++++++++
pulsar-client-cpp/include/pulsar/Result.h | 1 +
pulsar-client-cpp/lib/ClientConnection.cc | 3 +++
pulsar-client-cpp/lib/Result.cc | 3 +++
.../org/apache/pulsar/client/impl/ClientCnx.java | 2 ++
.../apache/pulsar/common/api/proto/PulsarApi.java | 3 +++
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
9 files changed, 38 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 6d0e50f..7ec97ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -212,6 +212,8 @@ public class BrokerServiceException extends Exception {
return ServerError.TransactionCoordinatorNotFound;
} else if (t instanceof CoordinatorException.InvalidTxnStatusException) {
return ServerError.InvalidTxnStatus;
+ } else if (t instanceof NotAllowedException) {
+ return ServerError.NotAllowedError;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 2a7a20b..610c4d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -397,7 +397,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
receiveAndCheck(checkList);
}
- @Test(expectedExceptions = PulsarClientException.class)
+ @Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
public void testDisableKeySharedSubscription() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(false);
String topic = "persistent://public/default/key_shared_disabled";
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 6a6e42a..597e0d5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -640,6 +640,23 @@ public class PulsarClientException extends IOException {
}
/**
+ * Not allowed exception thrown by Pulsar client.
+ */
+ public static class NotAllowedException extends PulsarClientException {
+
+ /**
+ * Constructs an {@code NotAllowedException} with the specified detail message.
+ *
+ * @param msg
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ */
+ public NotAllowedException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
* Full producer queue error thrown by Pulsar client.
*/
public static class ProducerQueueIsFullError extends PulsarClientException {
@@ -790,6 +807,8 @@ public class PulsarClientException extends IOException {
return new InvalidTopicNameException(msg);
} else if (t instanceof NotSupportedException) {
return new NotSupportedException(msg);
+ } else if (t instanceof NotAllowedException) {
+ return new NotAllowedException(msg);
} else if (t instanceof ProducerQueueIsFullError) {
return new ProducerQueueIsFullError(msg);
} else if (t instanceof ProducerBlockedQuotaExceededError) {
@@ -873,6 +892,8 @@ public class PulsarClientException extends IOException {
return new InvalidTopicNameException(msg);
} else if (cause instanceof NotSupportedException) {
return new NotSupportedException(msg);
+ } else if (cause instanceof NotAllowedException) {
+ return new NotAllowedException(msg);
} else if (cause instanceof ProducerQueueIsFullError) {
return new ProducerQueueIsFullError(msg);
} else if (cause instanceof ProducerBlockedQuotaExceededError) {
@@ -911,6 +932,7 @@ public class PulsarClientException extends IOException {
|| t instanceof InvalidMessageException
|| t instanceof InvalidTopicNameException
|| t instanceof NotSupportedException
+ || t instanceof NotAllowedException
|| t instanceof ChecksumException
|| t instanceof CryptoException
|| t instanceof ConsumerAssignException
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index b2a16c5..6dd7e45 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -82,6 +82,7 @@ enum Result
/// Shared and Key_Shared subscription mode
ResultTransactionCoordinatorNotFoundError, /// Transaction coordinator not found
ResultInvalidTxnStatusError, /// Invalid txn status error
+ ResultNotAllowedError, /// Not allowed
};
// Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fe43805..3628dd5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -117,6 +117,9 @@ static Result getResult(ServerError serverError) {
case InvalidTxnStatus:
return ResultInvalidTxnStatusError;
+
+ case NotAllowedError:
+ return ResultNotAllowedError;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index fc4d81f..3c1c2a8 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -144,6 +144,9 @@ const char* strResult(Result result) {
case ResultInvalidTxnStatusError:
return "ResultInvalidTxnStatusError";
+
+ case ResultNotAllowedError:
+ return "ResultNotAllowedError";
};
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index d63cbc1..ea1624b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1003,6 +1003,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case ConsumerAssignError:
return new PulsarClientException.ConsumerAssignException(errorMsg);
+ case NotAllowedError:
+ return new PulsarClientException.NotAllowedException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 02fed41..4d1babf 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -82,6 +82,7 @@ public final class PulsarApi {
ConsumerAssignError(19, 19),
TransactionCoordinatorNotFound(20, 20),
InvalidTxnStatus(21, 21),
+ NotAllowedError(22, 22),
;
public static final int UnknownError_VALUE = 0;
@@ -106,6 +107,7 @@ public final class PulsarApi {
public static final int ConsumerAssignError_VALUE = 19;
public static final int TransactionCoordinatorNotFound_VALUE = 20;
public static final int InvalidTxnStatus_VALUE = 21;
+ public static final int NotAllowedError_VALUE = 22;
public final int getNumber() { return value; }
@@ -134,6 +136,7 @@ public final class PulsarApi {
case 19: return ConsumerAssignError;
case 20: return TransactionCoordinatorNotFound;
case 21: return InvalidTxnStatus;
+ case 22: return NotAllowedError;
default: return null;
}
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index be5e4d4..c4acca2 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -196,6 +196,7 @@ enum ServerError {
TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
InvalidTxnStatus = 21; // Invalid txn status error
+ NotAllowedError = 22; // Not allowed error
}
enum AuthMethod {