You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:50:20 UTC

[pulsar] branch master updated: issue#3838 : Allow incompatible schemas to co-exist on a topic, allow set-schema-autoupdate-strategy = none (#3840)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e29bfd0  issue#3838 : Allow incompatible schemas to co-exist on a topic, allow set-schema-autoupdate-strategy = none (#3840)
e29bfd0 is described below

commit e29bfd08d012cc1a9d88a6b812f973bdb9374816
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Tue Mar 19 21:20:13 2019 +0530

    issue#3838 : Allow incompatible schemas to co-exist on a topic, allow set-schema-autoupdate-strategy = none (#3840)
    
    ## Problem:
    
    Fixes #3838 : The set-schema-autoupdate-strategy policy currently allows disabling schema update (--disabled) and one of --compatibility(FULL, BACKWARD, FORWARD). There is no way to currently allow multiple schemas for a topic.
    
    ## Solution:
    It would be useful to allow --compatibility so the schemas can be added without checking with the previous schemas. This allows for extra flexibility for instance having multiple objects on a topic, supports use cases like a simple event bus etc.
    
    ### Modifications
    
    This change added tests and can be verified as follows:
    
      - *Added test cases under SchemaUpdateStrategyTest*
---
 .../service/schema/AlwaysSchemaValidator.java      | 34 +++++++++++++
 .../schema/AvroSchemaBasedCompatibilityCheck.java  |  2 +
 .../schema/SchemaCompatibilityStrategy.java        |  7 +++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  2 +
 .../SchemaAutoUpdateCompatibilityStrategy.java     |  8 +++-
 site2/docs/reference-pulsar-admin.md               |  2 +-
 .../integration/cli/SchemaUpdateStrategyTest.java  | 55 +++++++++++++++++++++-
 7 files changed, 107 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AlwaysSchemaValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AlwaysSchemaValidator.java
new file mode 100644
index 0000000..e979828
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AlwaysSchemaValidator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidator;
+
+/**
+ * A schema validator that always reports as compatible.
+ */
+class AlwaysSchemaValidator implements SchemaValidator {
+    static AlwaysSchemaValidator INSTANCE = new AlwaysSchemaValidator();
+
+    @Override
+    public void validate(Schema toValidate, Iterable<Schema> existing) {
+        return;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index 41cb3a9..8b30714 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -58,6 +58,8 @@ abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityC
                 return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
             case FULL:
                 return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
+            case ALWAYS_COMPATIBLE:
+                return AlwaysSchemaValidator.INSTANCE;
             default:
                 return NeverSchemaValidator.INSTANCE;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
index 12e125e..221fc8c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
@@ -27,6 +27,11 @@ public enum SchemaCompatibilityStrategy {
     ALWAYS_INCOMPATIBLE,
 
     /**
+     * Always compatible
+     */
+    ALWAYS_COMPATIBLE,
+
+    /**
      * Messages written by a new schema can be read by an old schema
      */
     BACKWARD,
@@ -52,6 +57,8 @@ public enum SchemaCompatibilityStrategy {
             return FORWARD;
         case Full:
             return FULL;
+        case AlwaysCompatible:
+            return ALWAYS_COMPATIBLE;
         case AutoUpdateDisabled:
         default:
             return ALWAYS_INCOMPATIBLE;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a107411..8b1b147 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1004,6 +1004,8 @@ public class CmdNamespaces extends CmdBase {
                 strategy = SchemaAutoUpdateCompatibilityStrategy.Backward;
             } else if (strategyStr.equals("FORWARD")) {
                 strategy = SchemaAutoUpdateCompatibilityStrategy.Forward;
+            } else if (strategyStr.equals("NONE")) {
+                strategy = SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible;
             } else {
                 throw new PulsarAdminException("Either --compatibility or --disabled must be specified");
             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
index ac66689..bc437ed 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
@@ -45,5 +45,11 @@ public enum SchemaAutoUpdateCompatibilityStrategy {
     /**
      * Backward and Forward.
      */
-    Full
+    Full,
+
+    /**
+     * Always Compatible - The new schema will not be checked for compatibility against
+     * old schemas. In other words, new schemas will always be marked assumed compatible.
+     */
+    AlwaysCompatible
 }
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index 62a8d38..fd22a42 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1320,7 +1320,7 @@ $ pulsar-admin namespaces set-schema-autoupdate-strategy tenant/namespace option
 Options
 |Flag|Description|Default|
 |----|---|---|
-|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward).||
+|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward, None).||
 |`-d`, `--disabled`|Disable automatic schema updates.|false|
 
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
index c8c4692..5a3ca47 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
@@ -54,7 +54,7 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
                 "namespaces", "get-schema-autoupdate-strategy", namespace);
         Assert.assertEquals(result.getStdout().trim(), "FULL");
         pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
-                                                 "--compatibility", "BACKWARD", namespace);
+                "--compatibility", "BACKWARD", namespace);
 
         try (PulsarClient pulsarClient = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
@@ -76,6 +76,31 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
         }
     }
 
+    private void testNone(String namespace, String topicName) throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-schema-autoupdate-strategy", namespace);
+        Assert.assertEquals(result.getStdout().trim(), "FULL");
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
+                "--compatibility", "NONE", namespace);
+
+        try (PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+            try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+                p.send(new V1Data("test1", 1));
+            }
+
+            log.info("try with forward compat, should succeed");
+            try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+                p.send(new V3Data("test3", 1, 2));
+            }
+
+            log.info("try with backward compat, should succeed");
+            try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+                p.send(new V2Data("test2"));
+            }
+        }
+    }
+
     private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
         ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
                 "namespaces", "get-schema-autoupdate-strategy", namespace);
@@ -101,6 +126,11 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
                 p.send(new V3Data("test2", 1, 2));
             }
+
+            log.info("try with fully compat, should succeed");
+            try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+                p.send(new V4Data("test2", 1, (short)100));
+            }
         }
 
     }
@@ -281,6 +311,17 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
     }
 
     @Test
+    public void testNoneV2() throws Exception {
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
+                pulsarCluster.getClusterName(), "public/none-p-v2");
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
+                pulsarCluster.getClusterName(), "public/none-np-v2");
+
+        testNone("public/none-p-v2", "persistent://public/none-p-v2/topic1");
+        testNone("public/none-np-v2", "non-persistent://public/none-np-v2/topic1");
+    }
+
+    @Test
     public void testDisabledV2() throws Exception {
         pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
                                                  pulsarCluster.getClusterName(), "public/dis-p-v2");
@@ -328,6 +369,18 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
     }
 
     @Test
+    public void testNoneV1() throws Exception {
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
+                "public/" + pulsarCluster.getClusterName() + "/none-p-v1");
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
+                "public/" + pulsarCluster.getClusterName() + "/none-np-v1");
+        testNone("public/" + pulsarCluster.getClusterName() + "/none-p-v1",
+                "persistent://public/" + pulsarCluster.getClusterName() + "/none-p-v1/topic1");
+        testNone("public/" + pulsarCluster.getClusterName() + "/none-np-v1",
+                "persistent://public/" + pulsarCluster.getClusterName() + "/none-np-v1/topic1");
+    }
+
+    @Test
     public void testDisabledV1() throws Exception {
         pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
                                                  "public/" + pulsarCluster.getClusterName() + "/dis-p-v1");