You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/12 18:39:45 UTC

[GitHub] ivankelly closed pull request #2691: Automatic schema update can be disabled through admin interface

ivankelly closed pull request #2691: Automatic schema update can be disabled through admin interface
URL: https://github.com/apache/pulsar/pull/2691
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4beba60cf3..65c7d38a5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -39,6 +39,7 @@
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
@@ -73,6 +74,7 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -1651,5 +1653,53 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
         }
     }
 
+    protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
+    }
+
+    protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        mutatePolicy((policies) -> {
+                policies.schema_auto_update_compatibility_strategy = strategy;
+                return policies;
+            }, (policies) -> policies.schema_auto_update_compatibility_strategy,
+            "schemaAutoUpdateCompatibilityStrategy");
+    }
+
+
+    private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
+                                  Function<Policies, T> getter,
+                                  String policyName) {
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies = policyTransformation.apply(policies);
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated {} configuration: namespace={}, value={}",
+                     clientAppId(), policyName, namespaceName, getter.apply(policies));
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist",
+                     clientAppId(), policyName, namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
+                     clientAppId(), policyName, namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update {} configuration for namespace {}",
+                      clientAppId(), policyName, namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 42b89fcf15..561351da91 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -771,5 +772,39 @@ public void setOffloadThreshold(@PathParam("property") String property,
         internalSetOffloadThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{tenant}/{cluster}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "The strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(
+            @PathParam("tenant") String tenant,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, cluster, namespace);
+        return internalGetSchemaAutoUpdateCompatibilityStrategy();
+    }
+
+    @PUT
+    @Path("/{tenant}/{cluster}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "Update the strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String tenant,
+                                                         @PathParam("cluster") String cluster,
+                                                         @PathParam("namespace") String namespace,
+                                                         SchemaAutoUpdateCompatibilityStrategy strategy) {
+        validateNamespaceName(tenant, cluster, namespace);
+        internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 1e6e8c2ab2..e335521053 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,6 +46,7 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -762,5 +763,37 @@ public void clearOffloadDeletionLag(@PathParam("tenant") String tenant,
         internalSetOffloadDeletionLag(null);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "The strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetSchemaAutoUpdateCompatibilityStrategy();
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "Update the strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String tenant,
+                                                         @PathParam("namespace") String namespace,
+                                                         SchemaAutoUpdateCompatibilityStrategy strategy) {
+        validateNamespaceName(tenant, namespace);
+        internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 6076276aa7..314d37a7f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -50,6 +50,7 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.DeleteSchemaResponse;
@@ -273,7 +274,8 @@ public void postSchema(
                 .timestamp(clock.millis())
                 .type(SchemaType.valueOf(payload.getType()))
                 .user(defaultIfEmpty(clientAppId(), ""))
-                .build()
+                .build(),
+            SchemaCompatibilityStrategy.FULL
         ).thenAccept(version ->
             response.resume(
                 Response.accepted().entity(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 9527191cea..e6a62b0fb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -64,6 +64,7 @@
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -138,6 +139,8 @@ protected TopicStats initialValue() {
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
+    private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+        SchemaCompatibilityStrategy.FULL;
 
     private static class TopicStats {
         public double averageMsgSize;
@@ -180,6 +183,9 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
                     .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             isEncryptionRequired = policies.encryption_required;
+            schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                    policies.schema_auto_update_compatibility_strategy);
+
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
             isEncryptionRequired = false;
@@ -947,6 +953,9 @@ public void checkInactiveSubscriptions() {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
         }
         isEncryptionRequired = data.encryption_required;
+        schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                data.schema_auto_update_compatibility_strategy);
+
         producers.forEach(producer -> {
             producer.checkPermissions();
             producer.checkEncryption();
@@ -1021,7 +1030,7 @@ public void markBatchMessagePublished() {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .putSchemaIfAbsent(id, schema);
+            .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
@@ -1030,7 +1039,7 @@ public void markBatchMessagePublished() {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .isCompatibleWithLatestVersion(id, schema);
+            .isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 34703faf84..d0be156b70 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -79,6 +79,7 @@
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -180,6 +181,8 @@
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
+    private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+        SchemaCompatibilityStrategy.FULL;
 
     private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
         @Override
@@ -255,6 +258,9 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
                     .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             isEncryptionRequired = policies.encryption_required;
+
+            schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                    policies.schema_auto_update_compatibility_strategy);
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
             isEncryptionRequired = false;
@@ -1555,6 +1561,10 @@ private boolean shouldTopicBeRetained() {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
         }
         isEncryptionRequired = data.encryption_required;
+
+        schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                data.schema_auto_update_compatibility_strategy);
+
         producers.forEach(producer -> {
             producer.checkPermissions();
             producer.checkEncryption();
@@ -1815,7 +1825,7 @@ public synchronized OffloadProcessStatus offloadStatus() {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .putSchemaIfAbsent(id, schema);
+            .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
@@ -1824,7 +1834,7 @@ public synchronized OffloadProcessStatus offloadStatus() {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .isCompatibleWithLatestVersion(id, schema);
+            .isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java
index b7dd6d2c37..2a3dc75c0e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java
@@ -28,17 +28,11 @@
 
 import java.util.Arrays;
 
-public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
-
-    private final SchemaCompatibilityStrategy compatibilityStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-    public AvroSchemaCompatibilityCheck () {
-        this(SchemaCompatibilityStrategy.FULL);
-    }
-
-    public AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) {
-        this.compatibilityStrategy = compatibilityStrategy;
-    }
+public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
+    private final static Logger log = LoggerFactory.getLogger(AvroSchemaCompatibilityCheck.class);
 
     @Override
     public SchemaType getSchemaType() {
@@ -46,14 +40,13 @@ public SchemaType getSchemaType() {
     }
 
     @Override
-    public boolean isCompatible(SchemaData from, SchemaData to) {
-
+    public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
         Schema.Parser fromParser = new Schema.Parser();
         Schema fromSchema = fromParser.parse(new String(from.getData()));
         Schema.Parser toParser = new Schema.Parser();
         Schema toSchema =  toParser.parse(new String(to.getData()));
 
-        SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
+        SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
         try {
             schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
         } catch (SchemaValidationException e) {
@@ -70,8 +63,10 @@ private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy
                 return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
             case FORWARD:
                 return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
-            default:
+            case FULL:
                 return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
+            default:
+                return NeverSchemaValidator.INSTANCE;
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index fef288c64b..2988079660 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -36,7 +36,8 @@
     }
 
     @Override
-    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
+                                                              SchemaCompatibilityStrategy strategy) {
         return completedFuture(null);
     }
 
@@ -51,7 +52,8 @@ public SchemaVersion versionFromBytes(byte[] version) {
     }
 
     @Override
-    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema) {
+    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
+                                                                    SchemaCompatibilityStrategy strategy) {
         return CompletableFuture.completedFuture(true);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
index f3a5e62669..68e596133a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
@@ -34,28 +34,17 @@
 @SuppressWarnings("unused")
 public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
 
-    private final SchemaCompatibilityStrategy compatibilityStrategy;
-
-    public JsonSchemaCompatibilityCheck () {
-        this(SchemaCompatibilityStrategy.FULL);
-    }
-
-    public JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) {
-        this.compatibilityStrategy = compatibilityStrategy;
-    }
-
     @Override
     public SchemaType getSchemaType() {
         return SchemaType.JSON;
     }
 
     @Override
-    public boolean isCompatible(SchemaData from, SchemaData to) {
-
+    public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
         if (isAvroSchema(from)) {
             if (isAvroSchema(to)) {
                 // if both producer and broker have the schema in avro format
-                return isCompatibleAvroSchema(from, to);
+                return isCompatibleAvroSchema(from, to, strategy);
             } else if (isJsonSchema(to)) {
                 // if broker have the schema in avro format but producer sent a schema in the old json format
                 // allow old schema format for backwards compatiblity
@@ -85,13 +74,13 @@ public boolean isCompatible(SchemaData from, SchemaData to) {
         }
     }
 
-    private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to) {
+    private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
         Schema.Parser fromParser = new Schema.Parser();
         Schema fromSchema = fromParser.parse(new String(from.getData()));
         Schema.Parser toParser = new Schema.Parser();
         Schema toSchema =  toParser.parse(new String(to.getData()));
 
-        SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
+        SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
         try {
             schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
         } catch (SchemaValidationException e) {
@@ -148,8 +137,10 @@ private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy
                 return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
             case FORWARD:
                 return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
-            default:
+            case FULL:
                 return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
+            default:
+                return NeverSchemaValidator.INSTANCE;
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/NeverSchemaValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/NeverSchemaValidator.java
new file mode 100644
index 0000000000..ac3b64f626
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/NeverSchemaValidator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An avro schema validator that always reports as incompatible, if there is an existing schema.
+ */
+class NeverSchemaValidator implements SchemaValidator {
+    private final static Logger log = LoggerFactory.getLogger(NeverSchemaValidator.class);
+    static NeverSchemaValidator INSTANCE = new NeverSchemaValidator();
+
+    @Override
+    public void validate(Schema toValidate, Iterable<Schema> existing)
+            throws SchemaValidationException {
+        for (Schema s : existing) {
+            // only throw exception if there are existing schemas
+            throw new SchemaValidationException(toValidate, toValidate);
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java
index 0e4dafe3e9..7b78eb46dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java
@@ -22,14 +22,6 @@
 
 public class ProtobufSchemaCompatibilityCheck extends AvroSchemaCompatibilityCheck {
 
-    public ProtobufSchemaCompatibilityCheck () {
-        this(SchemaCompatibilityStrategy.FULL);
-    }
-
-    public ProtobufSchemaCompatibilityCheck (SchemaCompatibilityStrategy compatibilityStrategy) {
-        super(compatibilityStrategy);
-    }
-
     @Override
     public SchemaType getSchemaType() {
         return SchemaType.PROTOBUF;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
index eea0e7891b..7e8335f4bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
@@ -28,9 +28,10 @@
      *
      * @param from the current schema i.e. schema that the broker has
      * @param to the future schema i.e. the schema sent by the producer
+     * @param strategy the strategy to use when comparing schemas
      * @return whether the schemas are compatible
      */
-    boolean isCompatible(SchemaData from, SchemaData to);
+    boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy);
 
     SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {
         @Override
@@ -39,8 +40,12 @@ public SchemaType getSchemaType() {
         }
 
         @Override
-        public boolean isCompatible(SchemaData from, SchemaData to) {
-            return true;
+        public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
+            if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
+                return false;
+            } else {
+                return true;
+            }
         }
     };
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
index 85c9d503b0..12e125e230 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
@@ -18,8 +18,43 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+
 public enum SchemaCompatibilityStrategy {
+    /**
+     * Always incompatible
+     */
+    ALWAYS_INCOMPATIBLE,
+
+    /**
+     * Messages written by a new schema can be read by an old schema
+     */
     BACKWARD,
+
+    /**
+     * Messages written by an old schema can be read be a new schema
+     */
     FORWARD,
-    FULL
+
+    /**
+     * Equivalent to both FORWARD and BACKWARD
+     */
+    FULL;
+
+    public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) {
+        if (strategy == null) {
+            return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE;
+        }
+        switch (strategy) {
+        case Backward:
+            return BACKWARD;
+        case Forward:
+            return FORWARD;
+        case Full:
+            return FULL;
+        case AutoUpdateDisabled:
+        default:
+            return ALWAYS_INCOMPATIBLE;
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index 8a2e6abe2c..6b8ea24797 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -30,11 +30,13 @@
 
     CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version);
 
-    CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema);
+    CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
+                                                       SchemaCompatibilityStrategy strategy);
 
     CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user);
 
-    CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema);
+    CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
+                                                             SchemaCompatibilityStrategy strategy);
 
     SchemaVersion versionFromBytes(byte[] version);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index aa0a39eba2..dc5a6817da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -23,6 +23,7 @@
 import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.HashCode;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
@@ -82,9 +83,10 @@
 
     @Override
     @NotNull
-    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
+                                                              SchemaCompatibilityStrategy strategy) {
         return getSchema(schemaId).thenApply(
-                (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema))
+                (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema, strategy))
             .thenCompose(isCompatible -> {
                     if (isCompatible) {
                         byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
@@ -112,8 +114,9 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema) {
-        return checkCompatibilityWithLatest(schemaId, schema);
+    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
+                                                                    SchemaCompatibilityStrategy strategy) {
+        return checkCompatibilityWithLatest(schemaId, schema, strategy);
     }
 
     @Override
@@ -137,14 +140,19 @@ public void close() throws Exception {
             .build();
     }
 
-    private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema) {
-        return compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
-            .isCompatible(existingSchema.schema, newSchema);
+    private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema,
+                                 SchemaCompatibilityStrategy strategy) {
+        HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());
+        HashCode newHash = hashFunction.hashBytes(newSchema.getData());
+        return newHash.equals(existingHash) ||
+            compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
+            .isCompatible(existingSchema.schema, newSchema, strategy);
     }
 
-    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {
+    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
+                                                                    SchemaCompatibilityStrategy strategy) {
         return getSchema(schemaId).thenApply(
-                (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema));
+                (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema, strategy));
     }
 
     interface Functions {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
new file mode 100644
index 0000000000..5060c8867f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import org.apache.avro.reflect.AvroAlias;
+import org.apache.avro.reflect.AvroDefault;
+import com.google.common.collect.Sets;
+
+import java.lang.reflect.Field;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test"));
+        admin.namespaces().createNamespace("prop-xyz/test/ns1");
+        admin.namespaces().createNamespace("prop-xyz/ns2", Sets.newHashSet("test"));
+        admin.namespaces().createNamespace("prop-xyz/test/ns2");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void testAutoUpdateBackward(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                                                                    SchemaAutoUpdateCompatibilityStrategy.Backward);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+
+        log.info("try with forward compat, should fail");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            Assert.fail("Forward compat schema should be rejected");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with backward compat, should succeed");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            p.send(new V2Data("test2"));
+        }
+
+    }
+
+    private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                                                                    SchemaAutoUpdateCompatibilityStrategy.Forward);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+
+        log.info("try with backward compat, should fail");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            Assert.fail("Backward compat schema should be rejected");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with forward compat, should succeed");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            p.send(new V3Data("test2", 1, 2));
+        }
+    }
+
+    private void testAutoUpdateFull(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+
+        log.info("try with backward compat only, should fail");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            Assert.fail("Backward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with forward compat only, should fail");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            Assert.fail("Forward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with fully compat");
+        try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+            p.send(new V4Data("test2", 1, (short)100));
+        }
+    }
+
+    private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+        log.info("try with backward compat only, should fail");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            Assert.fail("Backward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with forward compat only, should fail");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            Assert.fail("Forward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with fully compat, should fail");
+        try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+            Assert.fail("Fully compat schema should fail, autoupdate disabled");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("Should still be able to connect with original schema");
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test2", 2));
+        }
+
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                SchemaAutoUpdateCompatibilityStrategy.Full);
+
+        for (int i = 0; i < 100; i++) {
+            Topic t = pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+            // get around fact that field is private and topic can be persisent or non-persistent
+            Field strategy = t.getClass().getDeclaredField("schemaCompatibilityStrategy");
+            strategy.setAccessible(true);
+            if (((SchemaCompatibilityStrategy)strategy.get(t)) == SchemaCompatibilityStrategy.FULL) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        log.info("try with fully compat, again");
+        try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+            p.send(new V4Data("test2", 1, (short)100));
+        }
+    }
+
+    @AvroAlias(space="blah", alias="data")
+    static class V1Data {
+        String foo;
+        int bar;
+
+        V1Data(String foo, int bar) {
+            this.foo = foo;
+            this.bar = bar;
+        }
+    }
+
+    // backward compatible with V1Data
+    @AvroAlias(space="blah", alias="data")
+    static class V2Data {
+        String foo;
+
+        V2Data(String foo) {
+            this.foo = foo;
+        }
+    }
+
+    // forward compatible with V1Data
+    @AvroAlias(space="blah", alias="data")
+    static class V3Data {
+        String foo;
+        int bar;
+        long baz;
+
+        V3Data(String foo, int bar, long baz) {
+            this.foo = foo;
+            this.bar = bar;
+            this.baz = baz;
+        }
+    }
+
+    // fully compatible with V1Data
+    @AvroAlias(space="blah", alias="data")
+    static class V4Data {
+        String foo;
+        int bar;
+        @AvroDefault(value = "10")
+        short blah;
+
+        V4Data(String foo, int bar, short blah) {
+            this.foo = foo;
+            this.bar = bar;
+            this.blah = blah;
+        }
+    }
+
+    @Test
+    public void testBackwardV2() throws Exception {
+        testAutoUpdateBackward("prop-xyz/ns1", "persistent://prop-xyz/ns1/backward");
+        testAutoUpdateBackward("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/backward-np");
+    }
+
+    @Test
+    public void testForwardV2() throws Exception {
+        testAutoUpdateForward("prop-xyz/ns1", "persistent://prop-xyz/ns1/forward");
+        testAutoUpdateForward("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/forward-np");
+    }
+
+    @Test
+    public void testFullV2() throws Exception {
+        testAutoUpdateFull("prop-xyz/ns1", "persistent://prop-xyz/ns1/full");
+        testAutoUpdateFull("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/full-np");
+    }
+
+    @Test
+    public void testDisabledV2() throws Exception {
+        testAutoUpdateDisabled("prop-xyz/ns1", "persistent://prop-xyz/ns1/disabled");
+        testAutoUpdateDisabled("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/disabled-np");
+    }
+
+    @Test
+    public void testBackwardV1() throws Exception {
+        testAutoUpdateBackward("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/backward");
+        testAutoUpdateBackward("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/backward-np");
+    }
+
+    @Test
+    public void testForwardV1() throws Exception {
+        testAutoUpdateForward("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/forward");
+        testAutoUpdateForward("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/forward-np");
+    }
+
+    @Test
+    public void testFullV1() throws Exception {
+        testAutoUpdateFull("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/full");
+        testAutoUpdateFull("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/full-np");
+    }
+
+    @Test
+    public void testDisabledV1() throws Exception {
+        testAutoUpdateDisabled("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/disabled");
+        testAutoUpdateDisabled("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/disabled-np");
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java
index 269a77215a..695d6645b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java
@@ -26,17 +26,8 @@
 public class AvroSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
 
     @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
+    public SchemaCompatibilityCheck getSchemaCheck() {
+        return new AvroSchemaCompatibilityCheck();
     }
 
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java
index b54e952429..67d8432532 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java
@@ -70,12 +70,7 @@
     private static final SchemaData schemaData7 = getSchemaData(schemaJson7);
 
 
-    public abstract SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck();
-
-    public abstract SchemaCompatibilityCheck getForwardCompatibleSchemaCheck();
-
-    public abstract SchemaCompatibilityCheck getFullCompatibleSchemaCheck();
-
+    public abstract SchemaCompatibilityCheck getSchemaCheck();
 
     /**
      * make sure new schema is backwards compatible with latest
@@ -83,27 +78,34 @@
     @Test
     public void testBackwardCompatibility() {
 
-        SchemaCompatibilityCheck schemaCompatibilityCheck = getBackwardsCompatibleSchemaCheck();
+        SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck();
         // adding a field with default is backwards compatible
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2,
+                                                                SchemaCompatibilityStrategy.BACKWARD),
                 "adding a field with default is backwards compatible");
         // adding a field without default is NOT backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "adding a field without default is NOT backwards compatible");
         // Modifying a field name is not backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "Modifying a field name is not backwards compatible");
         // evolving field to a union is backwards compatible
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5,
+                                                                SchemaCompatibilityStrategy.BACKWARD),
                 "evolving field to a union is backwards compatible");
         // removing a field from a union is NOT backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData1),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData1,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "removing a field from a union is NOT backwards compatible");
         // adding a field to a union is backwards compatible
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData6),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData6,
+                                                                SchemaCompatibilityStrategy.BACKWARD),
                 "adding a field to a union is backwards compatible");
         // removing a field a union is NOT backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData6, schemaData5),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData6, schemaData5,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "removing a field a union is NOT backwards compatible");
     }
 
@@ -113,21 +115,28 @@ public void testBackwardCompatibility() {
     @Test
     public void testForwardCompatibility() {
 
-        SchemaCompatibilityCheck schemaCompatibilityCheck = getForwardCompatibleSchemaCheck();
+        SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck();
 
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData3),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData3,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData7),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData7,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "removing fields is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData1),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData1,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "removing fields with defaults forward compatible");
     }
 
@@ -136,12 +145,15 @@ public void testForwardCompatibility() {
      */
     @Test
     public void testFullCompatibility() {
-        SchemaCompatibilityCheck schemaCompatibilityCheck = getFullCompatibleSchemaCheck();
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
+        SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck();
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2,
+                                                                SchemaCompatibilityStrategy.FULL),
                 "adding a field with default fully compatible");
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
+                                                                 SchemaCompatibilityStrategy.FULL),
                 "adding a field without default is not fully compatible");
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData1),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData1,
+                                                                 SchemaCompatibilityStrategy.FULL),
                 "adding a field without default is not fully compatible");
 
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 341487f695..9a8a53930f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -46,18 +46,8 @@
 public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
 
     @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
+    public SchemaCompatibilityCheck getSchemaCheck() {
+        return new JsonSchemaCompatibilityCheck();
     }
 
     @Test
@@ -66,11 +56,11 @@ public void testJsonSchemaBackwardsCompatibility() throws JsonProcessingExceptio
         SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
-        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to));
+        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
 
         from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
-        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to));
+        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
     }
 
     @Data
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
index adc1ecfdd3..34e1ea843e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
@@ -21,17 +21,7 @@
 public class ProtobufSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
 
     @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
+    public SchemaCompatibilityCheck getSchemaCheck() {
+        return new ProtobufSchemaCompatibilityCheck();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 98afe88bde..9ccdbf03d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -219,7 +219,8 @@ public void dontReAddExistingSchemaInMiddle() throws Exception {
     }
 
     private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception {
-        CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(schemaId, schema);
+        CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(
+                schemaId, schema, SchemaCompatibilityStrategy.FULL);
         SchemaVersion newVersion = put.get();
         assertEquals(expectedVersion, newVersion);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index e2c8747564..c9e919e067 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -27,6 +27,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -118,7 +119,8 @@ public void testJsonProducerAndConsumerWithPrestoredSchema() throws Exception {
                     .user("me")
                     .data(jsonSchema.getSchemaInfo().getSchema())
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
@@ -159,7 +161,8 @@ public void testJsonConsumerWithWrongCorruptedSchema() throws Exception {
                     .user("me")
                     .data(randomSchemaBytes)
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
@@ -186,7 +189,8 @@ public void testJsonProducerWithWrongCorruptedSchema() throws Exception {
                     .user("me")
                     .data(randomSchemaBytes)
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Producer<JsonEncodedPojo> producer = pulsarClient
@@ -264,7 +268,8 @@ public void testProtobufConsumerWithWrongPrestoredSchema() throws Exception {
                                 .user("me")
                                 .data(schema.getSchemaInfo().getSchema())
                                 .props(Collections.emptyMap())
-                                .build()
+                                .build(),
+                        SchemaCompatibilityStrategy.FULL
                 ).get();
 
         Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient
@@ -345,7 +350,8 @@ public void testAvroConsumerWithWrongPrestoredSchema() throws Exception {
                     .user("me")
                     .data(randomSchemaBytes)
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Consumer<AvroEncodedPojo> consumer = pulsarClient
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 8a319c6969..6460c1ff5c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -34,6 +34,7 @@
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 /**
@@ -1281,4 +1282,44 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle,
      *             Unexpected error
      */
     void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
+     * before it is installed.
+     *
+     * <p>If this is
+     * {@link org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy#AutoUpdateDisabled},
+     * then all new schemas provided via the producer are rejected, and schemas must be updated through the REST api.
+     *
+     * @param namespace The namespace in whose policy we are interested
+     * @return the strategy used to check compatibility
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(String namespace)
+            throws PulsarAdminException;
+
+    /**
+     * Set the strategy used to check the a new schema provided by a producer is compatible with the current schema
+     * before it is installed.
+     *
+     * <p>To disable all new schema updates through the producer, set this to
+     * {@link org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy#AutoUpdateDisabled}.
+     *
+     * @param namespace The namespace in whose policy should be set
+     * @param autoUpdate true if connecting producers can automatically update the schema, false otherwise
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
+                                                  SchemaAutoUpdateCompatibilityStrategy strategy)
+            throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index ba2950efe5..c9845a918c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -43,6 +43,7 @@
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 public class NamespacesImpl extends BaseResource implements Namespaces {
@@ -736,6 +737,32 @@ public void clearOffloadDeleteLag(String namespace) throws PulsarAdminException
         }
     }
 
+    @Override
+    public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(String namespace)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "schemaAutoUpdateCompatibilityStrategy");
+            return request(path).get(SchemaAutoUpdateCompatibilityStrategy.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
+                                                         SchemaAutoUpdateCompatibilityStrategy strategy)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "schemaAutoUpdateCompatibilityStrategy");
+            request(path).put(Entity.entity(strategy, MediaType.APPLICATION_JSON),
+                              ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 9c7efeb6a9..d376a86812 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -61,6 +61,9 @@
     public long offload_threshold = -1;
     public Long offload_deletion_lag_ms = null;
 
+    public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy =
+        SchemaAutoUpdateCompatibilityStrategy.Full;
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof Policies) {
@@ -82,7 +85,8 @@ public boolean equals(Object obj) {
                     && max_consumers_per_subscription == other.max_consumers_per_subscription
                     && compaction_threshold == other.compaction_threshold
                     && offload_threshold == other.offload_threshold
-                    && offload_deletion_lag_ms == other.offload_deletion_lag_ms;
+                    && offload_deletion_lag_ms == other.offload_deletion_lag_ms
+                    && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy;
         }
 
         return false;
@@ -115,6 +119,7 @@ public String toString() {
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
                 .add("compaction_threshold", compaction_threshold)
                 .add("offload_threshold", offload_threshold)
-                .add("offload_deletion_lag_ms", offload_deletion_lag_ms).toString();
+                .add("offload_deletion_lag_ms", offload_deletion_lag_ms)
+                .add("schema_auto_update_compatibility_strategy", schema_auto_update_compatibility_strategy).toString();
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
new file mode 100644
index 0000000000..ac6668981e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Strategy to use when checking an auto-updated schema for compatibility to the current schema.
+ */
+public enum SchemaAutoUpdateCompatibilityStrategy {
+    /**
+     * Don't allow any auto updates.
+     */
+    AutoUpdateDisabled,
+
+    /**
+     * Messages written in the previous schema can be read by the new schema.
+     * To be backward compatible, the new schema must not add any new fields that
+     * don't have default values. However, it may remove fields.
+     */
+    Backward,
+
+    /**
+     * Messages written in the new schema can be read by the previous schema.
+     * To be forward compatible, the new schema must not remove any fields which
+     * don't have default values in the previous schema. However, it may add new fields.
+     */
+    Forward,
+
+    /**
+     * Backward and Forward.
+     */
+    Full
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services