You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:50:20 UTC
[pulsar] branch master updated: issue#3838 : Allow incompatible
schemas to co-exist on a topic,
allow set-schema-autoupdate-strategy = none (#3840)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e29bfd0 issue#3838 : Allow incompatible schemas to co-exist on a topic, allow set-schema-autoupdate-strategy = none (#3840)
e29bfd0 is described below
commit e29bfd08d012cc1a9d88a6b812f973bdb9374816
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Tue Mar 19 21:20:13 2019 +0530
issue#3838 : Allow incompatible schemas to co-exist on a topic, allow set-schema-autoupdate-strategy = none (#3840)
## Problem:
Fixes #3838 : The set-schema-autoupdate-strategy policy currently allows disabling schema update (--disabled) and one of --compatibility(FULL, BACKWARD, FORWARD). There is no way to currently allow multiple schemas for a topic.
## Solution:
It would be useful to allow --compatibility so the schemas can be added without checking with the previous schemas. This allows for extra flexibility for instance having multiple objects on a topic, supports use cases like a simple event bus etc.
### Modifications
This change added tests and can be verified as follows:
- *Added test cases under SchemaUpdateStrategyTest*
---
.../service/schema/AlwaysSchemaValidator.java | 34 +++++++++++++
.../schema/AvroSchemaBasedCompatibilityCheck.java | 2 +
.../schema/SchemaCompatibilityStrategy.java | 7 +++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 2 +
.../SchemaAutoUpdateCompatibilityStrategy.java | 8 +++-
site2/docs/reference-pulsar-admin.md | 2 +-
.../integration/cli/SchemaUpdateStrategyTest.java | 55 +++++++++++++++++++++-
7 files changed, 107 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AlwaysSchemaValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AlwaysSchemaValidator.java
new file mode 100644
index 0000000..e979828
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AlwaysSchemaValidator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidator;
+
+/**
+ * A schema validator that always reports as compatible.
+ */
+class AlwaysSchemaValidator implements SchemaValidator {
+ static AlwaysSchemaValidator INSTANCE = new AlwaysSchemaValidator();
+
+ @Override
+ public void validate(Schema toValidate, Iterable<Schema> existing) {
+ return;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index 41cb3a9..8b30714 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -58,6 +58,8 @@ abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityC
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
case FULL:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
+ case ALWAYS_COMPATIBLE:
+ return AlwaysSchemaValidator.INSTANCE;
default:
return NeverSchemaValidator.INSTANCE;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
index 12e125e..221fc8c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
@@ -27,6 +27,11 @@ public enum SchemaCompatibilityStrategy {
ALWAYS_INCOMPATIBLE,
/**
+ * Always compatible
+ */
+ ALWAYS_COMPATIBLE,
+
+ /**
* Messages written by a new schema can be read by an old schema
*/
BACKWARD,
@@ -52,6 +57,8 @@ public enum SchemaCompatibilityStrategy {
return FORWARD;
case Full:
return FULL;
+ case AlwaysCompatible:
+ return ALWAYS_COMPATIBLE;
case AutoUpdateDisabled:
default:
return ALWAYS_INCOMPATIBLE;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a107411..8b1b147 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1004,6 +1004,8 @@ public class CmdNamespaces extends CmdBase {
strategy = SchemaAutoUpdateCompatibilityStrategy.Backward;
} else if (strategyStr.equals("FORWARD")) {
strategy = SchemaAutoUpdateCompatibilityStrategy.Forward;
+ } else if (strategyStr.equals("NONE")) {
+ strategy = SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible;
} else {
throw new PulsarAdminException("Either --compatibility or --disabled must be specified");
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
index ac66689..bc437ed 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
@@ -45,5 +45,11 @@ public enum SchemaAutoUpdateCompatibilityStrategy {
/**
* Backward and Forward.
*/
- Full
+ Full,
+
+ /**
+ * Always Compatible - The new schema will not be checked for compatibility against
+ * old schemas. In other words, new schemas will always be marked assumed compatible.
+ */
+ AlwaysCompatible
}
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index 62a8d38..fd22a42 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1320,7 +1320,7 @@ $ pulsar-admin namespaces set-schema-autoupdate-strategy tenant/namespace option
Options
|Flag|Description|Default|
|----|---|---|
-|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward).||
+|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward, None).||
|`-d`, `--disabled`|Disable automatic schema updates.|false|
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
index c8c4692..5a3ca47 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
@@ -54,7 +54,7 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
"namespaces", "get-schema-autoupdate-strategy", namespace);
Assert.assertEquals(result.getStdout().trim(), "FULL");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
- "--compatibility", "BACKWARD", namespace);
+ "--compatibility", "BACKWARD", namespace);
try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
@@ -76,6 +76,31 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
}
}
+ private void testNone(String namespace, String topicName) throws Exception {
+ ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces", "get-schema-autoupdate-strategy", namespace);
+ Assert.assertEquals(result.getStdout().trim(), "FULL");
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
+ "--compatibility", "NONE", namespace);
+
+ try (PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+ try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+ p.send(new V1Data("test1", 1));
+ }
+
+ log.info("try with forward compat, should succeed");
+ try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+ p.send(new V3Data("test3", 1, 2));
+ }
+
+ log.info("try with backward compat, should succeed");
+ try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+ p.send(new V2Data("test2"));
+ }
+ }
+ }
+
private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-schema-autoupdate-strategy", namespace);
@@ -101,6 +126,11 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
p.send(new V3Data("test2", 1, 2));
}
+
+ log.info("try with fully compat, should succeed");
+ try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+ p.send(new V4Data("test2", 1, (short)100));
+ }
}
}
@@ -281,6 +311,17 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
}
@Test
+ public void testNoneV2() throws Exception {
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
+ pulsarCluster.getClusterName(), "public/none-p-v2");
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
+ pulsarCluster.getClusterName(), "public/none-np-v2");
+
+ testNone("public/none-p-v2", "persistent://public/none-p-v2/topic1");
+ testNone("public/none-np-v2", "non-persistent://public/none-np-v2/topic1");
+ }
+
+ @Test
public void testDisabledV2() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
pulsarCluster.getClusterName(), "public/dis-p-v2");
@@ -328,6 +369,18 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
}
@Test
+ public void testNoneV1() throws Exception {
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
+ "public/" + pulsarCluster.getClusterName() + "/none-p-v1");
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
+ "public/" + pulsarCluster.getClusterName() + "/none-np-v1");
+ testNone("public/" + pulsarCluster.getClusterName() + "/none-p-v1",
+ "persistent://public/" + pulsarCluster.getClusterName() + "/none-p-v1/topic1");
+ testNone("public/" + pulsarCluster.getClusterName() + "/none-np-v1",
+ "persistent://public/" + pulsarCluster.getClusterName() + "/none-np-v1/topic1");
+ }
+
+ @Test
public void testDisabledV1() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
"public/" + pulsarCluster.getClusterName() + "/dis-p-v1");