You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/10 14:46:41 UTC

[GitHub] merlimat closed pull request #3345: Propagate specific Schema error to client

merlimat closed pull request #3345: Propagate specific Schema error to client
URL: https://github.com/apache/pulsar/pull/3345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 31a4ac7838..d037edf711 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 
 /**
@@ -163,6 +164,8 @@ public AlreadyRunningException(String msg) {
         } else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
                 || t instanceof SubscriptionFencedException) {
             return PulsarApi.ServerError.ServiceNotReady;
+        } else if (t instanceof IncompatibleSchemaException) {
+            return PulsarApi.ServerError.IncompatibleSchema;
         } else {
             return PulsarApi.ServerError.UnknownError;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 577d3acea9..e6819bf579 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -97,7 +97,6 @@
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
@@ -600,7 +599,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                                                                 readCompacted, initialPosition);
                                                     } else {
                                                         return FutureUtil.failedFuture(
-                                                                new BrokerServiceException(
+                                                                new IncompatibleSchemaException(
                                                                         "Trying to subscribe with incompatible schema"
                                                         ));
                                                     }
@@ -846,7 +845,9 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                             }
 
                             schemaVersionFuture.exceptionally(exception -> {
-                                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage()));
+                                ctx.writeAndFlush(Commands.newError(requestId,
+                                        BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                        exception.getMessage()));
                                 producers.remove(producerId, producerFuture);
                                 return null;
                             });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 7d524a9f84..5516e331a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -22,6 +22,7 @@
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -156,7 +157,7 @@ public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
             }
         } catch (PulsarClientException e) {
             if (schemaValidationEnforced) {
-                Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+                Assert.assertTrue(e instanceof IncompatibleSchemaException);
             } else {
                 Assert.fail("Shouldn't throw IncompatibleSchemaException"
                     + " if SchemaValidationEnforced is disabled");
@@ -193,7 +194,7 @@ public void newConsumerWithSchemaOnExistingTopicWithoutSchema() throws Exception
                 .topic(topic).subscriptionName("sub1").subscribe()) {
             Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
         } catch (PulsarClientException e) {
-            Assert.assertTrue(e.getMessage().contains("Trying to subscribe with incompatible schema"));
+            Assert.assertTrue(e instanceof IncompatibleSchemaException);
         }
     }
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 2618066c13..7aa72e1367 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -67,6 +67,12 @@ public TimeoutException(String msg) {
         }
     }
 
+    public static class IncompatibleSchemaException extends PulsarClientException {
+        public IncompatibleSchemaException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class LookupException extends PulsarClientException {
         public LookupException(String msg) {
             super(msg);
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index c1975d7ac8..a4bc3b316f 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -75,6 +75,8 @@ enum Result
     ResultUnsupportedVersionError,  /// Error when an older client/version doesn't support a required feature
     ResultTopicTerminated,          /// Topic was already terminated
     ResultCryptoError,              /// Error when crypto operation fails
+
+    ResultIncompatibleSchema,  /// Specified schema is incompatible with the topic's schema
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index a19bd44d52..fbda62b180 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -109,6 +109,9 @@ static Result getResult(ServerError serverError) {
 
         case InvalidTopicName:
             return ResultInvalidTopicName;
+
+        case IncompatibleSchema:
+            return ResultIncompatibleSchema;
     }
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 89713815ad..ff66bf6317 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -128,6 +128,9 @@ const char* pulsar::strResult(Result result) {
 
         case ResultProducerBusy:
             return "ProducerBusy";
+
+        case ResultIncompatibleSchema:
+            return "IncompatibleSchema";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bb47a7e8f3..709a45a5cb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -136,7 +136,7 @@ public RequestTime(long creationTime, long requestId) {
             this.requestId = requestId;
         }
     }
-    
+
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
     }
@@ -831,6 +831,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String
             return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
         case TopicTerminatedError:
             return new PulsarClientException.TopicTerminatedException(errorMsg);
+        case IncompatibleSchema:
+            return new PulsarClientException.IncompatibleSchemaException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);
@@ -862,6 +864,6 @@ private void checkRequestTimeout() {
             }
         }
     }
-    
+
     private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 1ee5f40e97..f0adf0650d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -72,6 +72,7 @@ private CompressionType(int index, int value) {
     TopicTerminatedError(15, 15),
     ProducerBusy(16, 16),
     InvalidTopicName(17, 17),
+    IncompatibleSchema(18, 18),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -92,6 +93,7 @@ private CompressionType(int index, int value) {
     public static final int TopicTerminatedError_VALUE = 15;
     public static final int ProducerBusy_VALUE = 16;
     public static final int InvalidTopicName_VALUE = 17;
+    public static final int IncompatibleSchema_VALUE = 18;
     
     
     public final int getNumber() { return value; }
@@ -116,6 +118,7 @@ public static ServerError valueOf(int value) {
         case 15: return TopicTerminatedError;
         case 16: return ProducerBusy;
         case 17: return InvalidTopicName;
+        case 18: return IncompatibleSchema;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index aa21798dd3..33b4f699db 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -138,6 +138,8 @@ enum ServerError {
 
     ProducerBusy         = 16; // Producer with same name is already connected
     InvalidTopicName = 17; // The topic name is not valid
+
+    IncompatibleSchema = 18; // Specified schema was incompatible with topic schema
 }
 
 enum AuthMethod {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services