You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/10 14:46:44 UTC
[pulsar] branch master updated: Propagate specific Schema error to
client (#3345)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e9a5e61 Propagate specific Schema error to client (#3345)
e9a5e61 is described below
commit e9a5e61f06db9780669b39a96c5c29428334a0fe
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jan 10 06:46:39 2019 -0800
Propagate specific Schema error to client (#3345)
* Propagate specific Schema error to client
* Handling new enums in C++
* Fixed formatting
---
.../org/apache/pulsar/broker/service/BrokerServiceException.java | 3 +++
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 7 ++++---
.../test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java | 5 +++--
.../java/org/apache/pulsar/client/api/PulsarClientException.java | 6 ++++++
pulsar-client-cpp/include/pulsar/Result.h | 2 ++
pulsar-client-cpp/lib/ClientConnection.cc | 3 +++
pulsar-client-cpp/lib/Result.cc | 3 +++
.../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 6 ++++--
.../main/java/org/apache/pulsar/common/api/proto/PulsarApi.java | 3 +++
pulsar-common/src/main/proto/PulsarApi.proto | 2 ++
10 files changed, 33 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 31a4ac7..d037edf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.common.api.proto.PulsarApi;
/**
@@ -163,6 +164,8 @@ public class BrokerServiceException extends Exception {
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
+ } else if (t instanceof IncompatibleSchemaException) {
+ return PulsarApi.ServerError.IncompatibleSchema;
} else {
return PulsarApi.ServerError.UnknownError;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 577d3ac..e6819bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -97,7 +97,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
@@ -600,7 +599,7 @@ public class ServerCnx extends PulsarHandler {
readCompacted, initialPosition);
} else {
return FutureUtil.failedFuture(
- new BrokerServiceException(
+ new IncompatibleSchemaException(
"Trying to subscribe with incompatible schema"
));
}
@@ -846,7 +845,9 @@ public class ServerCnx extends PulsarHandler {
}
schemaVersionFuture.exceptionally(exception -> {
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage()));
+ ctx.writeAndFlush(Commands.newError(requestId,
+ BrokerServiceException.getClientErrorCode(exception.getCause()),
+ exception.getMessage()));
producers.remove(producerId, producerFuture);
return null;
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 7d524a9..5516e33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -156,7 +157,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
- Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+ Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ " if SchemaValidationEnforced is disabled");
@@ -193,7 +194,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
.topic(topic).subscriptionName("sub1").subscribe()) {
Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
} catch (PulsarClientException e) {
- Assert.assertTrue(e.getMessage().contains("Trying to subscribe with incompatible schema"));
+ Assert.assertTrue(e instanceof IncompatibleSchemaException);
}
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 2618066..7aa72e1 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -67,6 +67,12 @@ public class PulsarClientException extends IOException {
}
}
+ public static class IncompatibleSchemaException extends PulsarClientException {
+ public IncompatibleSchemaException(String msg) {
+ super(msg);
+ }
+ }
+
public static class LookupException extends PulsarClientException {
public LookupException(String msg) {
super(msg);
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index c1975d7..a4bc3b3 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -75,6 +75,8 @@ enum Result
ResultUnsupportedVersionError, /// Error when an older client/version doesn't support a required feature
ResultTopicTerminated, /// Topic was already terminated
ResultCryptoError, /// Error when crypto operation fails
+
+ ResultIncompatibleSchema, /// Specified schema is incompatible with the topic's schema
};
// Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index a19bd44..fbda62b 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -109,6 +109,9 @@ static Result getResult(ServerError serverError) {
case InvalidTopicName:
return ResultInvalidTopicName;
+
+ case IncompatibleSchema:
+ return ResultIncompatibleSchema;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 8971381..ff66bf6 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -128,6 +128,9 @@ const char* pulsar::strResult(Result result) {
case ResultProducerBusy:
return "ProducerBusy";
+
+ case ResultIncompatibleSchema:
+ return "IncompatibleSchema";
};
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bb47a7e..709a45a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -136,7 +136,7 @@ public class ClientCnx extends PulsarHandler {
this.requestId = requestId;
}
}
-
+
public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
}
@@ -831,6 +831,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
case TopicTerminatedError:
return new PulsarClientException.TopicTerminatedException(errorMsg);
+ case IncompatibleSchema:
+ return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
@@ -862,6 +864,6 @@ public class ClientCnx extends PulsarHandler {
}
}
}
-
+
private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 1ee5f40..f0adf06 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -72,6 +72,7 @@ public final class PulsarApi {
TopicTerminatedError(15, 15),
ProducerBusy(16, 16),
InvalidTopicName(17, 17),
+ IncompatibleSchema(18, 18),
;
public static final int UnknownError_VALUE = 0;
@@ -92,6 +93,7 @@ public final class PulsarApi {
public static final int TopicTerminatedError_VALUE = 15;
public static final int ProducerBusy_VALUE = 16;
public static final int InvalidTopicName_VALUE = 17;
+ public static final int IncompatibleSchema_VALUE = 18;
public final int getNumber() { return value; }
@@ -116,6 +118,7 @@ public final class PulsarApi {
case 15: return TopicTerminatedError;
case 16: return ProducerBusy;
case 17: return InvalidTopicName;
+ case 18: return IncompatibleSchema;
default: return null;
}
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index aa21798..33b4f69 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -138,6 +138,8 @@ enum ServerError {
ProducerBusy = 16; // Producer with same name is already connected
InvalidTopicName = 17; // The topic name is not valid
+
+ IncompatibleSchema = 18; // Specified schema was incompatible with topic schema
}
enum AuthMethod {