You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/10 14:46:44 UTC

[pulsar] branch master updated: Propagate specific Schema error to client (#3345)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a5e61  Propagate specific Schema error to client (#3345)
e9a5e61 is described below

commit e9a5e61f06db9780669b39a96c5c29428334a0fe
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jan 10 06:46:39 2019 -0800

    Propagate specific Schema error to client (#3345)
    
    * Propagate specific Schema error to client
    
    * Handling new enums in C++
    
    * Fixed formatting
---
 .../org/apache/pulsar/broker/service/BrokerServiceException.java   | 3 +++
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 7 ++++---
 .../test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java   | 5 +++--
 .../java/org/apache/pulsar/client/api/PulsarClientException.java   | 6 ++++++
 pulsar-client-cpp/include/pulsar/Result.h                          | 2 ++
 pulsar-client-cpp/lib/ClientConnection.cc                          | 3 +++
 pulsar-client-cpp/lib/Result.cc                                    | 3 +++
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java     | 6 ++++--
 .../main/java/org/apache/pulsar/common/api/proto/PulsarApi.java    | 3 +++
 pulsar-common/src/main/proto/PulsarApi.proto                       | 2 ++
 10 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 31a4ac7..d037edf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 
 /**
@@ -163,6 +164,8 @@ public class BrokerServiceException extends Exception {
         } else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
                 || t instanceof SubscriptionFencedException) {
             return PulsarApi.ServerError.ServiceNotReady;
+        } else if (t instanceof IncompatibleSchemaException) {
+            return PulsarApi.ServerError.IncompatibleSchema;
         } else {
             return PulsarApi.ServerError.UnknownError;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 577d3ac..e6819bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -97,7 +97,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
@@ -600,7 +599,7 @@ public class ServerCnx extends PulsarHandler {
                                                                 readCompacted, initialPosition);
                                                     } else {
                                                         return FutureUtil.failedFuture(
-                                                                new BrokerServiceException(
+                                                                new IncompatibleSchemaException(
                                                                         "Trying to subscribe with incompatible schema"
                                                         ));
                                                     }
@@ -846,7 +845,9 @@ public class ServerCnx extends PulsarHandler {
                             }
 
                             schemaVersionFuture.exceptionally(exception -> {
-                                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage()));
+                                ctx.writeAndFlush(Commands.newError(requestId,
+                                        BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                        exception.getMessage()));
                                 producers.remove(producerId, producerFuture);
                                 return null;
                             });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 7d524a9..5516e33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -156,7 +157,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             }
         } catch (PulsarClientException e) {
             if (schemaValidationEnforced) {
-                Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+                Assert.assertTrue(e instanceof IncompatibleSchemaException);
             } else {
                 Assert.fail("Shouldn't throw IncompatibleSchemaException"
                     + " if SchemaValidationEnforced is disabled");
@@ -193,7 +194,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 .topic(topic).subscriptionName("sub1").subscribe()) {
             Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
         } catch (PulsarClientException e) {
-            Assert.assertTrue(e.getMessage().contains("Trying to subscribe with incompatible schema"));
+            Assert.assertTrue(e instanceof IncompatibleSchemaException);
         }
     }
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 2618066..7aa72e1 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -67,6 +67,12 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    public static class IncompatibleSchemaException extends PulsarClientException {
+        public IncompatibleSchemaException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class LookupException extends PulsarClientException {
         public LookupException(String msg) {
             super(msg);
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index c1975d7..a4bc3b3 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -75,6 +75,8 @@ enum Result
     ResultUnsupportedVersionError,  /// Error when an older client/version doesn't support a required feature
     ResultTopicTerminated,          /// Topic was already terminated
     ResultCryptoError,              /// Error when crypto operation fails
+
+    ResultIncompatibleSchema,  /// Specified schema is incompatible with the topic's schema
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index a19bd44..fbda62b 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -109,6 +109,9 @@ static Result getResult(ServerError serverError) {
 
         case InvalidTopicName:
             return ResultInvalidTopicName;
+
+        case IncompatibleSchema:
+            return ResultIncompatibleSchema;
     }
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 8971381..ff66bf6 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -128,6 +128,9 @@ const char* pulsar::strResult(Result result) {
 
         case ResultProducerBusy:
             return "ProducerBusy";
+
+        case ResultIncompatibleSchema:
+            return "IncompatibleSchema";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bb47a7e..709a45a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -136,7 +136,7 @@ public class ClientCnx extends PulsarHandler {
             this.requestId = requestId;
         }
     }
-    
+
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
     }
@@ -831,6 +831,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
         case TopicTerminatedError:
             return new PulsarClientException.TopicTerminatedException(errorMsg);
+        case IncompatibleSchema:
+            return new PulsarClientException.IncompatibleSchemaException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);
@@ -862,6 +864,6 @@ public class ClientCnx extends PulsarHandler {
             }
         }
     }
-    
+
     private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 1ee5f40..f0adf06 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -72,6 +72,7 @@ public final class PulsarApi {
     TopicTerminatedError(15, 15),
     ProducerBusy(16, 16),
     InvalidTopicName(17, 17),
+    IncompatibleSchema(18, 18),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -92,6 +93,7 @@ public final class PulsarApi {
     public static final int TopicTerminatedError_VALUE = 15;
     public static final int ProducerBusy_VALUE = 16;
     public static final int InvalidTopicName_VALUE = 17;
+    public static final int IncompatibleSchema_VALUE = 18;
     
     
     public final int getNumber() { return value; }
@@ -116,6 +118,7 @@ public final class PulsarApi {
         case 15: return TopicTerminatedError;
         case 16: return ProducerBusy;
         case 17: return InvalidTopicName;
+        case 18: return IncompatibleSchema;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index aa21798..33b4f69 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -138,6 +138,8 @@ enum ServerError {
 
     ProducerBusy         = 16; // Producer with same name is already connected
     InvalidTopicName = 17; // The topic name is not valid
+
+    IncompatibleSchema = 18; // Specified schema was incompatible with topic schema
 }
 
 enum AuthMethod {