You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:09 UTC

[pulsar] 13/14: Handle NotAllowed Exception at the client side. (#7430)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 754b864cf5f8844881eb9d47f4eaba6b4fb6d5c2
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jul 17 09:22:40 2020 +0800

    Handle NotAllowed Exception at the client side. (#7430)
    
    * Handle NotAllowed Exception at the client side.
    
    (cherry picked from commit f8b2a2334fb7d2dc5266242a6393c9cc434fba60)
---
 .../broker/service/BrokerServiceException.java     |  2 ++
 .../client/api/KeySharedSubscriptionTest.java      |  2 +-
 .../pulsar/client/api/PulsarClientException.java   | 22 ++++++++++++++++++++++
 pulsar-client-cpp/include/pulsar/Result.h          |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  3 +++
 pulsar-client-cpp/lib/Result.cc                    |  3 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 ++
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 9 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 6d0e50f..7ec97ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -212,6 +212,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.TransactionCoordinatorNotFound;
         } else if (t instanceof CoordinatorException.InvalidTxnStatusException) {
             return ServerError.InvalidTxnStatus;
+        } else if (t instanceof NotAllowedException) {
+            return ServerError.NotAllowedError;
         } else {
             if (checkCauseIfUnknown) {
                 return getClientErrorCode(t.getCause(), false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 2a7a20b..610c4d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -397,7 +397,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         receiveAndCheck(checkList);
     }
 
-    @Test(expectedExceptions = PulsarClientException.class)
+    @Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
     public void testDisableKeySharedSubscription() throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(false);
         String topic = "persistent://public/default/key_shared_disabled";
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 6a6e42a..597e0d5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -640,6 +640,23 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Not allowed exception thrown by Pulsar client.
+     */
+    public static class NotAllowedException extends PulsarClientException {
+
+        /**
+         * Constructs an {@code NotAllowedException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         */
+        public NotAllowedException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
      * Full producer queue error thrown by Pulsar client.
      */
     public static class ProducerQueueIsFullError extends PulsarClientException {
@@ -790,6 +807,8 @@ public class PulsarClientException extends IOException {
             return new InvalidTopicNameException(msg);
         } else if (t instanceof NotSupportedException) {
             return new NotSupportedException(msg);
+        } else if (t instanceof NotAllowedException) {
+            return new NotAllowedException(msg);
         } else if (t instanceof ProducerQueueIsFullError) {
             return new ProducerQueueIsFullError(msg);
         } else if (t instanceof ProducerBlockedQuotaExceededError) {
@@ -873,6 +892,8 @@ public class PulsarClientException extends IOException {
             return new InvalidTopicNameException(msg);
         } else if (cause instanceof NotSupportedException) {
             return new NotSupportedException(msg);
+        } else if (cause instanceof NotAllowedException) {
+            return new NotAllowedException(msg);
         } else if (cause instanceof ProducerQueueIsFullError) {
             return new ProducerQueueIsFullError(msg);
         } else if (cause instanceof ProducerBlockedQuotaExceededError) {
@@ -911,6 +932,7 @@ public class PulsarClientException extends IOException {
                 || t instanceof InvalidMessageException
                 || t instanceof InvalidTopicNameException
                 || t instanceof NotSupportedException
+                || t instanceof NotAllowedException
                 || t instanceof ChecksumException
                 || t instanceof CryptoException
                 || t instanceof ConsumerAssignException
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index b2a16c5..6dd7e45 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -82,6 +82,7 @@ enum Result
                                                      /// Shared and Key_Shared subscription mode
     ResultTransactionCoordinatorNotFoundError,       /// Transaction coordinator not found
     ResultInvalidTxnStatusError,                     /// Invalid txn status error
+    ResultNotAllowedError,                           /// Not allowed
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fe43805..3628dd5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -117,6 +117,9 @@ static Result getResult(ServerError serverError) {
 
         case InvalidTxnStatus:
             return ResultInvalidTxnStatusError;
+
+        case NotAllowedError:
+            return ResultNotAllowedError;
     }
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index fc4d81f..3c1c2a8 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -144,6 +144,9 @@ const char* strResult(Result result) {
 
         case ResultInvalidTxnStatusError:
             return "ResultInvalidTxnStatusError";
+
+        case ResultNotAllowedError:
+            return "ResultNotAllowedError";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index d63cbc1..ea1624b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1003,6 +1003,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.TopicDoesNotExistException(errorMsg);
         case ConsumerAssignError:
             return new PulsarClientException.ConsumerAssignException(errorMsg);
+        case NotAllowedError:
+            return new PulsarClientException.NotAllowedException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 02fed41..4d1babf 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -82,6 +82,7 @@ public final class PulsarApi {
     ConsumerAssignError(19, 19),
     TransactionCoordinatorNotFound(20, 20),
     InvalidTxnStatus(21, 21),
+    NotAllowedError(22, 22),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -106,6 +107,7 @@ public final class PulsarApi {
     public static final int ConsumerAssignError_VALUE = 19;
     public static final int TransactionCoordinatorNotFound_VALUE = 20;
     public static final int InvalidTxnStatus_VALUE = 21;
+    public static final int NotAllowedError_VALUE = 22;
     
     
     public final int getNumber() { return value; }
@@ -134,6 +136,7 @@ public final class PulsarApi {
         case 19: return ConsumerAssignError;
         case 20: return TransactionCoordinatorNotFound;
         case 21: return InvalidTxnStatus;
+        case 22: return NotAllowedError;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index be5e4d4..c4acca2 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -196,6 +196,7 @@ enum ServerError {
 
     TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
     InvalidTxnStatus = 21; // Invalid txn status error
+    NotAllowedError = 22; // Not allowed error
 }
 
 enum AuthMethod {