You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 21:22:10 UTC
[pulsar] branch master updated: [Issue 4251][Schemas] add schema
parsing verification before update (#4252)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c81efee [Issue 4251][Schemas] add schema parsing verification before update (#4252)
c81efee is described below
commit c81efee00d1bdb9766d08199251d40f7551f48f4
Author: Alexandre DUVAL <al...@clever-cloud.com>
AuthorDate: Wed May 15 23:22:04 2019 +0200
[Issue 4251][Schemas] add schema parsing verification before update (#4252)
* add schema parsing verification before update
* use @Slf4j
* replace Boolean object to its primitive boolean
---
.../schema/AvroSchemaBasedCompatibilityCheck.java | 29 ++++++++++++++++------
.../service/schema/SchemaCompatibilityCheck.java | 12 +++++++++
.../service/schema/SchemaRegistryServiceImpl.java | 16 ++++++++----
3 files changed, 45 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index 8b30714..a91edab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -22,27 +22,42 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.Arrays;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.common.schema.SchemaData;
+import lombok.extern.slf4j.Slf4j;
/**
* The abstract implementation of {@link SchemaCompatibilityCheck} using Avro Schema.
*/
+@Slf4j
abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityCheck {
+ @Override
+ public boolean isWellFormed(SchemaData to) {
+ try {
+ Schema.Parser toParser = new Schema.Parser();
+ Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));
+ } catch (SchemaParseException e) {
+ log.error("Error during schema parsing: {}", e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
- Schema.Parser fromParser = new Schema.Parser();
- Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8));
- Schema.Parser toParser = new Schema.Parser();
- Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));
-
- SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
try {
+ Schema.Parser fromParser = new Schema.Parser();
+ Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8));
+ Schema.Parser toParser = new Schema.Parser();
+ Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));
+
+ SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
- } catch (SchemaValidationException e) {
+ } catch (SchemaParseException | SchemaValidationException e) {
+ log.error("Error during schema compatibility check: {}", e.getMessage(), e);
return false;
}
return true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
index 7e8335f..136f41f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
@@ -26,6 +26,13 @@ public interface SchemaCompatibilityCheck {
/**
*
+ * @param to the future schema i.e. the schema sent by the producer
+ * @return whether the schemas are well-formed
+ */
+ boolean isWellFormed(SchemaData to);
+
+ /**
+ *
* @param from the current schema i.e. schema that the broker has
* @param to the future schema i.e. the schema sent by the producer
* @param strategy the strategy to use when comparing schemas
@@ -40,6 +47,11 @@ public interface SchemaCompatibilityCheck {
}
@Override
+ public boolean isWellFormed(SchemaData to) {
+ return true;
+ }
+
+ @Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 9b47368..fe18fa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -88,11 +88,12 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
return getSchema(schemaId)
.thenApply(
(existingSchema) ->
- existingSchema == null
- || existingSchema.schema.isDeleted()
- || isCompatible(existingSchema, schema, strategy))
- .thenCompose(isCompatible -> {
- if (isCompatible) {
+ existingSchema == null ||
+ existingSchema.schema.isDeleted() ||
+ (isWellFormed(schema) &&
+ isCompatible(existingSchema, schema, strategy)))
+ .thenCompose(isValid -> {
+ if (isValid) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
@@ -144,6 +145,11 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
.build();
}
+ private boolean isWellFormed(SchemaData schema) {
+ return compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
+ .isWellFormed(schema);
+ }
+
private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema,
SchemaCompatibilityStrategy strategy) {
HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());