You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 21:22:10 UTC

[pulsar] branch master updated: [Issue 4251][Schemas] add schema parsing verification before update (#4252)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c81efee  [Issue 4251][Schemas] add schema parsing verification before update (#4252)
c81efee is described below

commit c81efee00d1bdb9766d08199251d40f7551f48f4
Author: Alexandre DUVAL <al...@clever-cloud.com>
AuthorDate: Wed May 15 23:22:04 2019 +0200

    [Issue 4251][Schemas] add schema parsing verification before update (#4252)
    
    * add schema parsing verification before update
    
    * use @Slf4j
    
    * replace Boolean object to its primitive boolean
---
 .../schema/AvroSchemaBasedCompatibilityCheck.java  | 29 ++++++++++++++++------
 .../service/schema/SchemaCompatibilityCheck.java   | 12 +++++++++
 .../service/schema/SchemaRegistryServiceImpl.java  | 16 ++++++++----
 3 files changed, 45 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index 8b30714..a91edab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -22,27 +22,42 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.Arrays;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
 import org.apache.avro.SchemaValidationException;
 import org.apache.avro.SchemaValidator;
 import org.apache.avro.SchemaValidatorBuilder;
 import org.apache.pulsar.common.schema.SchemaData;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * The abstract implementation of {@link SchemaCompatibilityCheck} using Avro Schema.
  */
+@Slf4j
 abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityCheck {
+    @Override
+    public boolean isWellFormed(SchemaData to) {
+        try {
+            Schema.Parser toParser = new Schema.Parser();
+            Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));
+        } catch (SchemaParseException e) {
+            log.error("Error during schema parsing: {}", e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
 
     @Override
     public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
-        Schema.Parser fromParser = new Schema.Parser();
-        Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8));
-        Schema.Parser toParser = new Schema.Parser();
-        Schema toSchema =  toParser.parse(new String(to.getData(), UTF_8));
-
-        SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
         try {
+            Schema.Parser fromParser = new Schema.Parser();
+            Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8));
+            Schema.Parser toParser = new Schema.Parser();
+            Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));
+
+            SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
             schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
-        } catch (SchemaValidationException e) {
+        } catch (SchemaParseException | SchemaValidationException e) {
+            log.error("Error during schema compatibility check: {}", e.getMessage(), e);
             return false;
         }
         return true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
index 7e8335f..136f41f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
@@ -26,6 +26,13 @@ public interface SchemaCompatibilityCheck {
 
     /**
      *
+     * @param to the future schema i.e. the schema sent by the producer
+     * @return whether the schemas are well-formed
+     */
+    boolean isWellFormed(SchemaData to);
+
+    /**
+     *
      * @param from the current schema i.e. schema that the broker has
      * @param to the future schema i.e. the schema sent by the producer
      * @param strategy the strategy to use when comparing schemas
@@ -40,6 +47,11 @@ public interface SchemaCompatibilityCheck {
         }
 
         @Override
+        public boolean isWellFormed(SchemaData to) {
+            return true;
+        }
+
+        @Override
         public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
             if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
                 return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 9b47368..fe18fa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -88,11 +88,12 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         return getSchema(schemaId)
             .thenApply(
                 (existingSchema) ->
-                    existingSchema == null
-                        || existingSchema.schema.isDeleted()
-                        || isCompatible(existingSchema, schema, strategy))
-            .thenCompose(isCompatible -> {
-                    if (isCompatible) {
+                    existingSchema == null ||
+                    existingSchema.schema.isDeleted() ||
+                    (isWellFormed(schema) &&
+                    isCompatible(existingSchema, schema, strategy)))
+            .thenCompose(isValid -> {
+                    if (isValid) {
                         byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
                         SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
                             .setType(Functions.convertFromDomainType(schema.getType()))
@@ -144,6 +145,11 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             .build();
     }
 
+    private boolean isWellFormed(SchemaData schema) {
+        return compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
+            .isWellFormed(schema);
+    }
+
     private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema,
                                  SchemaCompatibilityStrategy strategy) {
         HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());