You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by iv...@apache.org on 2018/10/12 18:39:48 UTC

[pulsar] branch master updated: Automatic schema update can be disabled through admin interface (#2691)

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 966c054  Automatic schema update can be disabled through admin interface (#2691)
966c054 is described below

commit 966c0545e8743bce756a710fd82c5274c07ff051
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Oct 12 20:39:43 2018 +0200

    Automatic schema update can be disabled through admin interface (#2691)
    
    Currently when a producer connects to a topic with a new schema, if
    that schema is fully compatible with the current schema, the schema
    for the topic is updated to the new schema.
    
    This may not be desirable in some cases.
    
    This patch allows users to set the auto update schema compatibility
    strategy at the namespace level. There are 4 compatibility levels,
    disabled, backward, forward and full.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  50 ++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  35 +++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  33 +++
 .../pulsar/broker/admin/v2/SchemasResource.java    |   4 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  13 +-
 .../broker/service/persistent/PersistentTopic.java |  14 +-
 .../schema/AvroSchemaCompatibilityCheck.java       |  23 +-
 .../schema/DefaultSchemaRegistryService.java       |   6 +-
 .../schema/JsonSchemaCompatibilityCheck.java       |  23 +-
 .../service/schema/NeverSchemaValidator.java}      |  28 +-
 .../schema/ProtobufSchemaCompatibilityCheck.java   |   8 -
 .../service/schema/SchemaCompatibilityCheck.java   |  11 +-
 .../schema/SchemaCompatibilityStrategy.java        |  37 ++-
 .../broker/service/schema/SchemaRegistry.java      |   6 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |  26 +-
 .../broker/admin/AdminApiSchemaAutoUpdateTest.java | 300 +++++++++++++++++++++
 .../schema/AvroSchemaCompatibilityCheckTest.java   |  13 +-
 .../schema/BaseAvroSchemaCompatibilityTest.java    |  64 +++--
 .../schema/JsonSchemaCompatibilityCheckTest.java   |  18 +-
 .../ProtobufSchemaCompatibilityCheckTest.java      |  14 +-
 .../broker/service/schema/SchemaServiceTest.java   |   3 +-
 .../api/SimpleTypedProducerConsumerTest.java       |  16 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  41 +++
 .../client/admin/internal/NamespacesImpl.java      |  27 ++
 .../pulsar/common/policies/data/Policies.java      |   9 +-
 .../SchemaAutoUpdateCompatibilityStrategy.java     |  49 ++++
 26 files changed, 729 insertions(+), 142 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4beba60..65c7d38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -39,6 +39,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
@@ -73,6 +74,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -1651,5 +1653,53 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
+    }
+
+    protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        mutatePolicy((policies) -> {
+                policies.schema_auto_update_compatibility_strategy = strategy;
+                return policies;
+            }, (policies) -> policies.schema_auto_update_compatibility_strategy,
+            "schemaAutoUpdateCompatibilityStrategy");
+    }
+
+
+    private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
+                                  Function<Policies, T> getter,
+                                  String policyName) {
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies = policyTransformation.apply(policies);
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated {} configuration: namespace={}, value={}",
+                     clientAppId(), policyName, namespaceName, getter.apply(policies));
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist",
+                     clientAppId(), policyName, namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
+                     clientAppId(), policyName, namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update {} configuration for namespace {}",
+                      clientAppId(), policyName, namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 42b89fc..561351d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -771,5 +772,39 @@ public class Namespaces extends NamespacesBase {
         internalSetOffloadThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{tenant}/{cluster}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "The strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(
+            @PathParam("tenant") String tenant,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, cluster, namespace);
+        return internalGetSchemaAutoUpdateCompatibilityStrategy();
+    }
+
+    @PUT
+    @Path("/{tenant}/{cluster}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "Update the strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String tenant,
+                                                         @PathParam("cluster") String cluster,
+                                                         @PathParam("namespace") String namespace,
+                                                         SchemaAutoUpdateCompatibilityStrategy strategy) {
+        validateNamespaceName(tenant, cluster, namespace);
+        internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 1e6e8c2..e335521 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -762,5 +763,37 @@ public class Namespaces extends NamespacesBase {
         internalSetOffloadDeletionLag(null);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "The strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetSchemaAutoUpdateCompatibilityStrategy();
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/schemaAutoUpdateCompatibilityStrategy")
+    @ApiOperation(value = "Update the strategy used to check the compatibility of new schemas,"
+                          + " provided by producers, before automatically updating the schema",
+                  notes = "The value AutoUpdateDisabled prevents producers from updating the schema. "
+                          + " If set to AutoUpdateDisabled, schemas must be updated through the REST api")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String tenant,
+                                                         @PathParam("namespace") String namespace,
+                                                         SchemaAutoUpdateCompatibilityStrategy strategy) {
+        validateNamespaceName(tenant, namespace);
+        internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 6076276..314d37a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -50,6 +50,7 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.DeleteSchemaResponse;
@@ -273,7 +274,8 @@ public class SchemasResource extends AdminResource {
                 .timestamp(clock.millis())
                 .type(SchemaType.valueOf(payload.getType()))
                 .user(defaultIfEmpty(clientAppId(), ""))
-                .build()
+                .build(),
+            SchemaCompatibilityStrategy.FULL
         ).thenAccept(version ->
             response.resume(
                 Response.accepted().entity(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 9527191..e6a62b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -138,6 +139,8 @@ public class NonPersistentTopic implements Topic {
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
+    private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+        SchemaCompatibilityStrategy.FULL;
 
     private static class TopicStats {
         public double averageMsgSize;
@@ -180,6 +183,9 @@ public class NonPersistentTopic implements Topic {
                     .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             isEncryptionRequired = policies.encryption_required;
+            schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                    policies.schema_auto_update_compatibility_strategy);
+
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
             isEncryptionRequired = false;
@@ -947,6 +953,9 @@ public class NonPersistentTopic implements Topic {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
         }
         isEncryptionRequired = data.encryption_required;
+        schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                data.schema_auto_update_compatibility_strategy);
+
         producers.forEach(producer -> {
             producer.checkPermissions();
             producer.checkEncryption();
@@ -1021,7 +1030,7 @@ public class NonPersistentTopic implements Topic {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .putSchemaIfAbsent(id, schema);
+            .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
@@ -1030,7 +1039,7 @@ public class NonPersistentTopic implements Topic {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .isCompatibleWithLatestVersion(id, schema);
+            .isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 34703fa..d0be156 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -180,6 +181,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
+    private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+        SchemaCompatibilityStrategy.FULL;
 
     private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
         @Override
@@ -255,6 +258,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                     .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             isEncryptionRequired = policies.encryption_required;
+
+            schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                    policies.schema_auto_update_compatibility_strategy);
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
             isEncryptionRequired = false;
@@ -1555,6 +1561,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
         }
         isEncryptionRequired = data.encryption_required;
+
+        schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                data.schema_auto_update_compatibility_strategy);
+
         producers.forEach(producer -> {
             producer.checkPermissions();
             producer.checkEncryption();
@@ -1815,7 +1825,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .putSchemaIfAbsent(id, schema);
+            .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
@@ -1824,7 +1834,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
             .getSchemaRegistryService()
-            .isCompatibleWithLatestVersion(id, schema);
+            .isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java
index b7dd6d2..2a3dc75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java
@@ -28,17 +28,11 @@ import org.apache.pulsar.common.schema.SchemaType;
 
 import java.util.Arrays;
 
-public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
-
-    private final SchemaCompatibilityStrategy compatibilityStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-    public AvroSchemaCompatibilityCheck () {
-        this(SchemaCompatibilityStrategy.FULL);
-    }
-
-    public AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) {
-        this.compatibilityStrategy = compatibilityStrategy;
-    }
+public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
+    private final static Logger log = LoggerFactory.getLogger(AvroSchemaCompatibilityCheck.class);
 
     @Override
     public SchemaType getSchemaType() {
@@ -46,14 +40,13 @@ public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
     }
 
     @Override
-    public boolean isCompatible(SchemaData from, SchemaData to) {
-
+    public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
         Schema.Parser fromParser = new Schema.Parser();
         Schema fromSchema = fromParser.parse(new String(from.getData()));
         Schema.Parser toParser = new Schema.Parser();
         Schema toSchema =  toParser.parse(new String(to.getData()));
 
-        SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
+        SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
         try {
             schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
         } catch (SchemaValidationException e) {
@@ -70,8 +63,10 @@ public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
                 return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
             case FORWARD:
                 return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
-            default:
+            case FULL:
                 return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
+            default:
+                return NeverSchemaValidator.INSTANCE;
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index fef288c..2988079 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -36,7 +36,8 @@ public class DefaultSchemaRegistryService implements SchemaRegistryService {
     }
 
     @Override
-    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
+                                                              SchemaCompatibilityStrategy strategy) {
         return completedFuture(null);
     }
 
@@ -51,7 +52,8 @@ public class DefaultSchemaRegistryService implements SchemaRegistryService {
     }
 
     @Override
-    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema) {
+    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
+                                                                    SchemaCompatibilityStrategy strategy) {
         return CompletableFuture.completedFuture(true);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
index f3a5e62..68e5961 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
@@ -34,28 +34,17 @@ import java.util.Arrays;
 @SuppressWarnings("unused")
 public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
 
-    private final SchemaCompatibilityStrategy compatibilityStrategy;
-
-    public JsonSchemaCompatibilityCheck () {
-        this(SchemaCompatibilityStrategy.FULL);
-    }
-
-    public JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) {
-        this.compatibilityStrategy = compatibilityStrategy;
-    }
-
     @Override
     public SchemaType getSchemaType() {
         return SchemaType.JSON;
     }
 
     @Override
-    public boolean isCompatible(SchemaData from, SchemaData to) {
-
+    public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
         if (isAvroSchema(from)) {
             if (isAvroSchema(to)) {
                 // if both producer and broker have the schema in avro format
-                return isCompatibleAvroSchema(from, to);
+                return isCompatibleAvroSchema(from, to, strategy);
             } else if (isJsonSchema(to)) {
                 // if broker have the schema in avro format but producer sent a schema in the old json format
                 // allow old schema format for backwards compatiblity
@@ -85,13 +74,13 @@ public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
         }
     }
 
-    private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to) {
+    private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
         Schema.Parser fromParser = new Schema.Parser();
         Schema fromSchema = fromParser.parse(new String(from.getData()));
         Schema.Parser toParser = new Schema.Parser();
         Schema toSchema =  toParser.parse(new String(to.getData()));
 
-        SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
+        SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
         try {
             schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
         } catch (SchemaValidationException e) {
@@ -148,8 +137,10 @@ public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
                 return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
             case FORWARD:
                 return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
-            default:
+            case FULL:
                 return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
+            default:
+                return NeverSchemaValidator.INSTANCE;
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/NeverSchemaValidator.java
similarity index 52%
copy from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/NeverSchemaValidator.java
index adc1ecf..ac3b64f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/NeverSchemaValidator.java
@@ -18,20 +18,26 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
-public class ProtobufSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidator;
 
-    @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
-    }
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
+/**
+ * An avro schema validator that always reports as incompatible, if there is an existing schema.
+ */
+class NeverSchemaValidator implements SchemaValidator {
+    private final static Logger log = LoggerFactory.getLogger(NeverSchemaValidator.class);
+    static NeverSchemaValidator INSTANCE = new NeverSchemaValidator();
 
     @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
+    public void validate(Schema toValidate, Iterable<Schema> existing)
+            throws SchemaValidationException {
+        for (Schema s : existing) {
+            // only throw exception if there are existing schemas
+            throw new SchemaValidationException(toValidate, toValidate);
+        }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java
index 0e4dafe..7b78eb4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java
@@ -22,14 +22,6 @@ import org.apache.pulsar.common.schema.SchemaType;
 
 public class ProtobufSchemaCompatibilityCheck extends AvroSchemaCompatibilityCheck {
 
-    public ProtobufSchemaCompatibilityCheck () {
-        this(SchemaCompatibilityStrategy.FULL);
-    }
-
-    public ProtobufSchemaCompatibilityCheck (SchemaCompatibilityStrategy compatibilityStrategy) {
-        super(compatibilityStrategy);
-    }
-
     @Override
     public SchemaType getSchemaType() {
         return SchemaType.PROTOBUF;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
index eea0e78..7e8335f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
@@ -28,9 +28,10 @@ public interface SchemaCompatibilityCheck {
      *
      * @param from the current schema i.e. schema that the broker has
      * @param to the future schema i.e. the schema sent by the producer
+     * @param strategy the strategy to use when comparing schemas
      * @return whether the schemas are compatible
      */
-    boolean isCompatible(SchemaData from, SchemaData to);
+    boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy);
 
     SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {
         @Override
@@ -39,8 +40,12 @@ public interface SchemaCompatibilityCheck {
         }
 
         @Override
-        public boolean isCompatible(SchemaData from, SchemaData to) {
-            return true;
+        public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
+            if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
+                return false;
+            } else {
+                return true;
+            }
         }
     };
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
index 85c9d50..12e125e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java
@@ -18,8 +18,43 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+
 public enum SchemaCompatibilityStrategy {
+    /**
+     * Always incompatible
+     */
+    ALWAYS_INCOMPATIBLE,
+
+    /**
+     * Messages written by a new schema can be read by an old schema
+     */
     BACKWARD,
+
+    /**
+     * Messages written by an old schema can be read be a new schema
+     */
     FORWARD,
-    FULL
+
+    /**
+     * Equivalent to both FORWARD and BACKWARD
+     */
+    FULL;
+
+    public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) {
+        if (strategy == null) {
+            return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE;
+        }
+        switch (strategy) {
+        case Backward:
+            return BACKWARD;
+        case Forward:
+            return FORWARD;
+        case Full:
+            return FULL;
+        case AutoUpdateDisabled:
+        default:
+            return ALWAYS_INCOMPATIBLE;
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index 8a2e6ab..6b8ea24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -30,11 +30,13 @@ public interface SchemaRegistry extends AutoCloseable {
 
     CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version);
 
-    CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema);
+    CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
+                                                       SchemaCompatibilityStrategy strategy);
 
     CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user);
 
-    CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema);
+    CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
+                                                             SchemaCompatibilityStrategy strategy);
 
     SchemaVersion versionFromBytes(byte[] version);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index aa0a39e..dc5a681 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.HashCode;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
@@ -82,9 +83,10 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
 
     @Override
     @NotNull
-    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
+                                                              SchemaCompatibilityStrategy strategy) {
         return getSchema(schemaId).thenApply(
-                (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema))
+                (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema, strategy))
             .thenCompose(isCompatible -> {
                     if (isCompatible) {
                         byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
@@ -112,8 +114,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     }
 
     @Override
-    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema) {
-        return checkCompatibilityWithLatest(schemaId, schema);
+    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema,
+                                                                    SchemaCompatibilityStrategy strategy) {
+        return checkCompatibilityWithLatest(schemaId, schema, strategy);
     }
 
     @Override
@@ -137,14 +140,19 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             .build();
     }
 
-    private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema) {
-        return compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
-            .isCompatible(existingSchema.schema, newSchema);
+    private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema,
+                                 SchemaCompatibilityStrategy strategy) {
+        HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());
+        HashCode newHash = hashFunction.hashBytes(newSchema.getData());
+        return newHash.equals(existingHash) ||
+            compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
+            .isCompatible(existingSchema.schema, newSchema, strategy);
     }
 
-    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {
+    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
+                                                                    SchemaCompatibilityStrategy strategy) {
         return getSchema(schemaId).thenApply(
-                (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema));
+                (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema, strategy));
     }
 
     interface Functions {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
new file mode 100644
index 0000000..5060c88
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import org.apache.avro.reflect.AvroAlias;
+import org.apache.avro.reflect.AvroDefault;
+import com.google.common.collect.Sets;
+
+import java.lang.reflect.Field;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test"));
+        admin.namespaces().createNamespace("prop-xyz/test/ns1");
+        admin.namespaces().createNamespace("prop-xyz/ns2", Sets.newHashSet("test"));
+        admin.namespaces().createNamespace("prop-xyz/test/ns2");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void testAutoUpdateBackward(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                                                                    SchemaAutoUpdateCompatibilityStrategy.Backward);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+
+        log.info("try with forward compat, should fail");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            Assert.fail("Forward compat schema should be rejected");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with backward compat, should succeed");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            p.send(new V2Data("test2"));
+        }
+
+    }
+
+    private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                                                                    SchemaAutoUpdateCompatibilityStrategy.Forward);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+
+        log.info("try with backward compat, should fail");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            Assert.fail("Backward compat schema should be rejected");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with forward compat, should succeed");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            p.send(new V3Data("test2", 1, 2));
+        }
+    }
+
+    private void testAutoUpdateFull(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+
+        log.info("try with backward compat only, should fail");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            Assert.fail("Backward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with forward compat only, should fail");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            Assert.fail("Forward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with fully compat");
+        try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+            p.send(new V4Data("test2", 1, (short)100));
+        }
+    }
+
+    private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception {
+        Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
+                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test1", 1));
+        }
+        log.info("try with backward compat only, should fail");
+        try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
+            Assert.fail("Backward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with forward compat only, should fail");
+        try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
+            Assert.fail("Forward compat only schema should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("try with fully compat, should fail");
+        try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+            Assert.fail("Fully compat schema should fail, autoupdate disabled");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+
+        log.info("Should still be able to connect with original schema");
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
+            p.send(new V1Data("test2", 2));
+        }
+
+        admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
+                SchemaAutoUpdateCompatibilityStrategy.Full);
+
+        for (int i = 0; i < 100; i++) {
+            Topic t = pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+            // get around fact that field is private and topic can be persisent or non-persistent
+            Field strategy = t.getClass().getDeclaredField("schemaCompatibilityStrategy");
+            strategy.setAccessible(true);
+            if (((SchemaCompatibilityStrategy)strategy.get(t)) == SchemaCompatibilityStrategy.FULL) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        log.info("try with fully compat, again");
+        try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
+            p.send(new V4Data("test2", 1, (short)100));
+        }
+    }
+
+    @AvroAlias(space="blah", alias="data")
+    static class V1Data {
+        String foo;
+        int bar;
+
+        V1Data(String foo, int bar) {
+            this.foo = foo;
+            this.bar = bar;
+        }
+    }
+
+    // backward compatible with V1Data
+    @AvroAlias(space="blah", alias="data")
+    static class V2Data {
+        String foo;
+
+        V2Data(String foo) {
+            this.foo = foo;
+        }
+    }
+
+    // forward compatible with V1Data
+    @AvroAlias(space="blah", alias="data")
+    static class V3Data {
+        String foo;
+        int bar;
+        long baz;
+
+        V3Data(String foo, int bar, long baz) {
+            this.foo = foo;
+            this.bar = bar;
+            this.baz = baz;
+        }
+    }
+
+    // fully compatible with V1Data
+    @AvroAlias(space="blah", alias="data")
+    static class V4Data {
+        String foo;
+        int bar;
+        @AvroDefault(value = "10")
+        short blah;
+
+        V4Data(String foo, int bar, short blah) {
+            this.foo = foo;
+            this.bar = bar;
+            this.blah = blah;
+        }
+    }
+
+    @Test
+    public void testBackwardV2() throws Exception {
+        testAutoUpdateBackward("prop-xyz/ns1", "persistent://prop-xyz/ns1/backward");
+        testAutoUpdateBackward("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/backward-np");
+    }
+
+    @Test
+    public void testForwardV2() throws Exception {
+        testAutoUpdateForward("prop-xyz/ns1", "persistent://prop-xyz/ns1/forward");
+        testAutoUpdateForward("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/forward-np");
+    }
+
+    @Test
+    public void testFullV2() throws Exception {
+        testAutoUpdateFull("prop-xyz/ns1", "persistent://prop-xyz/ns1/full");
+        testAutoUpdateFull("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/full-np");
+    }
+
+    @Test
+    public void testDisabledV2() throws Exception {
+        testAutoUpdateDisabled("prop-xyz/ns1", "persistent://prop-xyz/ns1/disabled");
+        testAutoUpdateDisabled("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/disabled-np");
+    }
+
+    @Test
+    public void testBackwardV1() throws Exception {
+        testAutoUpdateBackward("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/backward");
+        testAutoUpdateBackward("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/backward-np");
+    }
+
+    @Test
+    public void testForwardV1() throws Exception {
+        testAutoUpdateForward("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/forward");
+        testAutoUpdateForward("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/forward-np");
+    }
+
+    @Test
+    public void testFullV1() throws Exception {
+        testAutoUpdateFull("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/full");
+        testAutoUpdateFull("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/full-np");
+    }
+
+    @Test
+    public void testDisabledV1() throws Exception {
+        testAutoUpdateDisabled("prop-xyz/test/ns1", "persistent://prop-xyz/test/ns1/disabled");
+        testAutoUpdateDisabled("prop-xyz/test/ns2", "non-persistent://prop-xyz/test/ns2/disabled-np");
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java
index 269a772..695d664 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java
@@ -26,17 +26,8 @@ import org.testng.annotations.Test;
 public class AvroSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
 
     @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
+    public SchemaCompatibilityCheck getSchemaCheck() {
+        return new AvroSchemaCompatibilityCheck();
     }
 
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java
index b54e952..67d8432 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java
@@ -70,12 +70,7 @@ public abstract class BaseAvroSchemaCompatibilityTest {
     private static final SchemaData schemaData7 = getSchemaData(schemaJson7);
 
 
-    public abstract SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck();
-
-    public abstract SchemaCompatibilityCheck getForwardCompatibleSchemaCheck();
-
-    public abstract SchemaCompatibilityCheck getFullCompatibleSchemaCheck();
-
+    public abstract SchemaCompatibilityCheck getSchemaCheck();
 
     /**
      * make sure new schema is backwards compatible with latest
@@ -83,27 +78,34 @@ public abstract class BaseAvroSchemaCompatibilityTest {
     @Test
     public void testBackwardCompatibility() {
 
-        SchemaCompatibilityCheck schemaCompatibilityCheck = getBackwardsCompatibleSchemaCheck();
+        SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck();
         // adding a field with default is backwards compatible
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2,
+                                                                SchemaCompatibilityStrategy.BACKWARD),
                 "adding a field with default is backwards compatible");
         // adding a field without default is NOT backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "adding a field without default is NOT backwards compatible");
         // Modifying a field name is not backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "Modifying a field name is not backwards compatible");
         // evolving field to a union is backwards compatible
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5,
+                                                                SchemaCompatibilityStrategy.BACKWARD),
                 "evolving field to a union is backwards compatible");
         // removing a field from a union is NOT backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData1),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData1,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "removing a field from a union is NOT backwards compatible");
         // adding a field to a union is backwards compatible
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData6),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData6,
+                                                                SchemaCompatibilityStrategy.BACKWARD),
                 "adding a field to a union is backwards compatible");
         // removing a field a union is NOT backwards compatible
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData6, schemaData5),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData6, schemaData5,
+                                                                 SchemaCompatibilityStrategy.BACKWARD),
                 "removing a field a union is NOT backwards compatible");
     }
 
@@ -113,21 +115,28 @@ public abstract class BaseAvroSchemaCompatibilityTest {
     @Test
     public void testForwardCompatibility() {
 
-        SchemaCompatibilityCheck schemaCompatibilityCheck = getForwardCompatibleSchemaCheck();
+        SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck();
 
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData3),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData3,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "adding a field is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData7),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData7,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "removing fields is forward compatible");
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData1),
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData1,
+                                                                SchemaCompatibilityStrategy.FORWARD),
                 "removing fields with defaults forward compatible");
     }
 
@@ -136,12 +145,15 @@ public abstract class BaseAvroSchemaCompatibilityTest {
      */
     @Test
     public void testFullCompatibility() {
-        SchemaCompatibilityCheck schemaCompatibilityCheck = getFullCompatibleSchemaCheck();
-        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
+        SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck();
+        Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2,
+                                                                SchemaCompatibilityStrategy.FULL),
                 "adding a field with default fully compatible");
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
+                                                                 SchemaCompatibilityStrategy.FULL),
                 "adding a field without default is not fully compatible");
-        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData1),
+        Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData1,
+                                                                 SchemaCompatibilityStrategy.FULL),
                 "adding a field without default is not fully compatible");
 
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 341487f..9a8a539 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -46,18 +46,8 @@ import org.testng.annotations.Test;
 public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
 
     @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
+    public SchemaCompatibilityCheck getSchemaCheck() {
+        return new JsonSchemaCompatibilityCheck();
     }
 
     @Test
@@ -66,11 +56,11 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
         SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
-        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to));
+        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
 
         from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
-        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to));
+        Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
     }
 
     @Data
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
index adc1ecf..34e1ea8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheckTest.java
@@ -21,17 +21,7 @@ package org.apache.pulsar.broker.service.schema;
 public class ProtobufSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
 
     @Override
-    public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
-    }
-
-    @Override
-    public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
-        return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
+    public SchemaCompatibilityCheck getSchemaCheck() {
+        return new ProtobufSchemaCompatibilityCheck();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 98afe88..9ccdbf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -219,7 +219,8 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
     }
 
     private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception {
-        CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(schemaId, schema);
+        CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(
+                schemaId, schema, SchemaCompatibilityStrategy.FULL);
         SchemaVersion newVersion = put.get();
         assertEquals(expectedVersion, newVersion);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index e2c8747..c9e919e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -118,7 +119,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
                     .user("me")
                     .data(jsonSchema.getSchemaInfo().getSchema())
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
@@ -159,7 +161,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
                     .user("me")
                     .data(randomSchemaBytes)
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
@@ -186,7 +189,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
                     .user("me")
                     .data(randomSchemaBytes)
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Producer<JsonEncodedPojo> producer = pulsarClient
@@ -264,7 +268,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
                                 .user("me")
                                 .data(schema.getSchemaInfo().getSchema())
                                 .props(Collections.emptyMap())
-                                .build()
+                                .build(),
+                        SchemaCompatibilityStrategy.FULL
                 ).get();
 
         Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient
@@ -345,7 +350,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
                     .user("me")
                     .data(randomSchemaBytes)
                     .props(Collections.emptyMap())
-                    .build()
+                    .build(),
+                SchemaCompatibilityStrategy.FULL
             ).get();
 
         Consumer<AvroEncodedPojo> consumer = pulsarClient
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 8a319c6..6460c1f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 /**
@@ -1281,4 +1282,44 @@ public interface Namespaces {
      *             Unexpected error
      */
     void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
+     * before it is installed.
+     *
+     * <p>If this is
+     * {@link org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy#AutoUpdateDisabled},
+     * then all new schemas provided via the producer are rejected, and schemas must be updated through the REST api.
+     *
+     * @param namespace The namespace in whose policy we are interested
+     * @return the strategy used to check compatibility
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(String namespace)
+            throws PulsarAdminException;
+
+    /**
+     * Set the strategy used to check the a new schema provided by a producer is compatible with the current schema
+     * before it is installed.
+     *
+     * <p>To disable all new schema updates through the producer, set this to
+     * {@link org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy#AutoUpdateDisabled}.
+     *
+     * @param namespace The namespace in whose policy should be set
+     * @param autoUpdate true if connecting producers can automatically update the schema, false otherwise
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
+                                                  SchemaAutoUpdateCompatibilityStrategy strategy)
+            throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index ba2950e..c9845a9 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 
 public class NamespacesImpl extends BaseResource implements Namespaces {
@@ -736,6 +737,32 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         }
     }
 
+    @Override
+    public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(String namespace)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "schemaAutoUpdateCompatibilityStrategy");
+            return request(path).get(SchemaAutoUpdateCompatibilityStrategy.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
+                                                         SchemaAutoUpdateCompatibilityStrategy strategy)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "schemaAutoUpdateCompatibilityStrategy");
+            request(path).put(Entity.entity(strategy, MediaType.APPLICATION_JSON),
+                              ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 9c7efeb..d376a86 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -61,6 +61,9 @@ public class Policies {
     public long offload_threshold = -1;
     public Long offload_deletion_lag_ms = null;
 
+    public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy =
+        SchemaAutoUpdateCompatibilityStrategy.Full;
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof Policies) {
@@ -82,7 +85,8 @@ public class Policies {
                     && max_consumers_per_subscription == other.max_consumers_per_subscription
                     && compaction_threshold == other.compaction_threshold
                     && offload_threshold == other.offload_threshold
-                    && offload_deletion_lag_ms == other.offload_deletion_lag_ms;
+                    && offload_deletion_lag_ms == other.offload_deletion_lag_ms
+                    && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy;
         }
 
         return false;
@@ -115,6 +119,7 @@ public class Policies {
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
                 .add("compaction_threshold", compaction_threshold)
                 .add("offload_threshold", offload_threshold)
-                .add("offload_deletion_lag_ms", offload_deletion_lag_ms).toString();
+                .add("offload_deletion_lag_ms", offload_deletion_lag_ms)
+                .add("schema_auto_update_compatibility_strategy", schema_auto_update_compatibility_strategy).toString();
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
new file mode 100644
index 0000000..ac66689
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Strategy to use when checking an auto-updated schema for compatibility to the current schema.
+ */
+public enum SchemaAutoUpdateCompatibilityStrategy {
+    /**
+     * Don't allow any auto updates.
+     */
+    AutoUpdateDisabled,
+
+    /**
+     * Messages written in the previous schema can be read by the new schema.
+     * To be backward compatible, the new schema must not add any new fields that
+     * don't have default values. However, it may remove fields.
+     */
+    Backward,
+
+    /**
+     * Messages written in the new schema can be read by the previous schema.
+     * To be forward compatible, the new schema must not remove any fields which
+     * don't have default values in the previous schema. However, it may add new fields.
+     */
+    Forward,
+
+    /**
+     * Backward and Forward.
+     */
+    Full
+}