You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/07 23:42:23 UTC

[pulsar] branch master updated: [schema] provide a flag to disable/enable schema validation on broker (#2730)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da4e4a  [schema] provide a flag to disable/enable schema validation on broker (#2730)
0da4e4a is described below

commit 0da4e4af407650de874ed6c9de891d98aa540aad
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Sun Oct 7 16:42:18 2018 -0700

    [schema] provide a flag to disable/enable schema validation on broker (#2730)
    
    *Motivation*
    
    We need an upgrade/backward compatibility story for schema enforcement. Especially around:
    
    - `Producers cannot connect without a schema to topics with a schema`
    
    *Changes*
    
    - provide a flag on brokers to enable schema validation (and disabled it by default). this allows a smooth upgrade on brokers,
      otherwise, it will break all non-java producers on topics with schema immediately when upgrade to the new version
---
 conf/broker.conf                                   |  7 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 +++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 31 +++++++++--------
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    | 31 +++++++++--------
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  1 +
 .../pulsar/client/api/BrokerServiceLookupTest.java | 28 +++++++++------
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 40 ++++++++++++++++++++--
 .../java/org/apache/pulsar/client/api/Schema.java  | 16 +++++++++
 .../client/impl/schema/AutoConsumeSchema.java      |  7 ++++
 .../client/impl/schema/AutoProduceBytesSchema.java | 24 ++++++++++---
 .../pulsar/client/impl/schema/ByteSchema.java      | 10 ++++--
 .../pulsar/client/impl/schema/DoubleSchema.java    | 11 ++++--
 .../pulsar/client/impl/schema/FloatSchema.java     | 11 ++++--
 .../pulsar/client/impl/schema/IntSchema.java       | 11 ++++--
 .../pulsar/client/impl/schema/LongSchema.java      | 11 ++++--
 .../pulsar/client/impl/schema/ShortSchema.java     | 11 ++++--
 .../org/apache/pulsar/client/api/PulsarClient.java |  8 ++---
 .../ProxyAuthenticatedProducerConsumerTest.java    |  3 +-
 .../proxy/server/ProxyAuthenticationTest.java      |  7 ++--
 .../server/ProxyConnectionThrottlingTest.java      |  5 +--
 .../proxy/server/ProxyLookupThrottlingTest.java    |  7 ++--
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  9 ++---
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |  5 +--
 .../server/ProxyWithAuthorizationNegTest.java      |  5 +--
 .../proxy/server/ProxyWithAuthorizationTest.java   |  3 +-
 .../server/ProxyWithoutServiceDiscoveryTest.java   |  3 +-
 tests/integration/pom.xml                          |  6 ----
 .../integration/compat/kafka/KafkaApiTest.java     |  1 +
 29 files changed, 228 insertions(+), 97 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f4a57b6..867dcb3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -517,6 +517,13 @@ exposePublisherStats=true
 # The schema storage implementation used by this broker
 schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
 
+# Enforce schema validation on following cases:
+#
+# - if a producer without a schema attempts to produce to a topic with schema, the producer will be
+#   failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.
+#   if you enable this setting, it will cause non-java clients failed to produce.
+isSchemaValidationEnforced=false
+
 ### --- Ledger Offloading --- ###
 
 # The directory for all the offloader implementations
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e436ff4..3ddd08c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -472,6 +472,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // Interval between checks to see if topics with compaction policies need to be compacted
     private int brokerServiceCompactionMonitorIntervalInSeconds = 60;
 
+    private boolean isSchemaValidationEnforced = false;
     private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
     private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
             "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
@@ -1658,6 +1659,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
         this.exposeConsumerLevelMetricsInPrometheus = exposeConsumerLevelMetricsInPrometheus;
     }
 
+    public boolean isSchemaValidationEnforced() {
+        return isSchemaValidationEnforced;
+    }
+
+    public void setSchemaValidationEnforced(boolean enforced) {
+        this.isSchemaValidationEnforced = enforced;
+    }
+
     public String getSchemaRegistryStorageClassName() {
        return schemaRegistryStorageClassName;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 500ee97..483bf66 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -129,6 +129,7 @@ public class ServerCnx extends PulsarHandler {
     private String originalPrincipal = null;
     private Set<String> proxyRoles;
     private boolean authenticateOriginalAuthData;
+    private final boolean schemaValidationEnforced;
 
     enum State {
         Start, Connected, Failed
@@ -148,6 +149,7 @@ public class ServerCnx extends PulsarHandler {
                 .getMaxConcurrentNonPersistentMessagePerConnection();
         this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
         this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData();
+        this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
     }
 
     @Override
@@ -833,7 +835,7 @@ public class ServerCnx extends PulsarHandler {
                             } else {
                                 schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
                                         CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
-                                        if (hasSchema) {
+                                        if (hasSchema && schemaValidationEnforced) {
                                             result.completeExceptionally(new IncompatibleSchemaException(
                                                 "Producers cannot connect without a schema to topics with a schema"));
                                         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 187539f..5c18dfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -648,7 +649,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
 
         // Force topic creation and namespace being loaded
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1/my-topic")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -685,7 +686,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         }
 
         // Force topic creation and namespace being loaded
-        producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
+        producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
         producer.close();
         admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
 
@@ -836,7 +837,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
         assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -894,7 +895,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer()
+        producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -957,7 +958,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -987,7 +988,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1084,7 +1085,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1146,7 +1147,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1203,7 +1204,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1216,7 +1217,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = pulsarClient.newProducer()
+        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1-bundles/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1314,7 +1315,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1363,7 +1364,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/my-topic";
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1755,7 +1756,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -1949,7 +1950,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         String topicName = "persistent://prop-xyz/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         // mock actual compaction, we don't need to really run it
@@ -1983,7 +1984,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         String topicName = "persistent://prop-xyz/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index d2fb996..803f8af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -641,7 +642,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(admin.namespaces().getPersistence("prop-xyz/use/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
 
         // Force topic creation and namespace being loaded
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/my-topic").create();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns1/my-topic").create();
         producer.close();
         admin.topics().delete("persistent://prop-xyz/use/ns1/my-topic");
 
@@ -674,7 +675,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         }
 
         // Force topic creation and namespace being loaded
-        producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
+        producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
         producer.close();
         admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
 
@@ -806,7 +807,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
         assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -864,7 +865,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer()
+        producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -926,7 +927,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -956,7 +957,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1080,7 +1081,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1141,7 +1142,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1197,7 +1198,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1210,7 +1211,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = pulsarClient.newProducer()
+        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1-bundles/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1307,7 +1308,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1356,7 +1357,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/use/ns1/my-topic";
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1748,7 +1749,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -1939,7 +1940,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         String topicName = "persistent://prop-xyz/use/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         // mock actual compaction, we don't need to really run it
@@ -1973,7 +1974,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         String topicName = "persistent://prop-xyz/use/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 5c64ac80..aa6dd71 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Message;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 6a7953c..8a58321 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -176,8 +176,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // load namespace-bundle by calling Broker2
         Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic("persistent://my-property/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -263,8 +264,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // load namespace-bundle by calling Broker2
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+            .topic("persistent://my-property2/use2/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -336,7 +338,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
         /**** broker-2 started ****/
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic(topicName.toString())
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
@@ -489,8 +492,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).build();
         Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+            .topic("persistent://my-property2/use2/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -560,8 +564,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
                 .enableTls(true).allowTlsInsecureConnection(true).build();
         Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+            .topic("persistent://my-property2/use2/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -635,8 +640,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).build();
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic("persistent://my-property/use2/my-ns/my-topic1")
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index e6de6bf..7d524a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -20,17 +20,34 @@ package org.apache.pulsar.client.api;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 public class SimpleSchemaTest extends ProducerConsumerBase {
 
+    @DataProvider(name = "schemaValidationModes")
+    public static Object[][] schemaValidationModes() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    private final boolean schemaValidationEnforced;
+
+    @Factory(dataProvider = "schemaValidationModes")
+    public SimpleSchemaTest(boolean schemaValidationEnforced) {
+        this.schemaValidationEnforced = schemaValidationEnforced;
+    }
+
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        conf.setSchemaValidationEnforced(schemaValidationEnforced);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -130,10 +147,27 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             p.send(new V1Data(0));
         }
 
-        try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create()) {
-            Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema");
+        try (Producer<byte[]> p = pulsarClient.newProducer(Schema.BYTES).topic(topic).create()) {
+            if (!schemaValidationEnforced) {
+                p.send("junkdata".getBytes(UTF_8));
+            } else {
+                Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema"
+                    + " if SchemaValidationEnabled is enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (schemaValidationEnforced) {
+                Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+            } else {
+                Assert.fail("Shouldn't throw IncompatibleSchemaException"
+                    + " if SchemaValidationEnforced is disabled");
+            }
+        }
+
+        // if using AUTO_PRODUCE_BYTES, producer can connect but the publish will fail
+        try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create()) {
+            p.send("junkdata".getBytes(UTF_8));
         } catch (PulsarClientException e) {
-            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+            assertTrue(e.getCause() instanceof SchemaSerializationException);
         }
     }
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 5432d43..5872db0 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -41,6 +41,22 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 public interface Schema<T> {
 
     /**
+     * Check if the message is a valid object for this schema.
+     *
+     * <p>The implementation can choose what its most efficient approach to validate the schema.
+     * If the implementation doesn't provide it, it will attempt to use {@link #decode(byte[])}
+     * to see if this schema can decode this message or not as a validation mechanism to verify
+     * the bytes.
+     *
+     * @param message the messages to verify
+     * @return true if it is a valid message
+     * @throws SchemaSerializationException if it is not a valid message
+     */
+    default void validate(byte[] message) {
+        decode(message);
+    }
+
+    /**
      * Encode an object representing the message content into a byte array.
      *
      * @param message
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index dc90931..e795c70 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -41,6 +41,13 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
     }
 
     @Override
+    public void validate(byte[] message) {
+        ensureSchemaInitialized();
+
+        schema.validate(message);
+    }
+
+    @Override
     public byte[] encode(GenericRecord message) {
         ensureSchemaInitialized();
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 36520a9..517d53a 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -23,16 +23,21 @@ import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * Auto detect schema.
  */
 public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
+    private boolean requireSchemaValidation = true;
     private Schema<T> schema;
 
     public void setSchema(Schema<T> schema) {
         this.schema = schema;
+        this.requireSchemaValidation = schema.getSchemaInfo() != null
+            && SchemaType.BYTES != schema.getSchemaInfo().getType()
+            && SchemaType.NONE != schema.getSchemaInfo().getType();
     }
 
     private void ensureSchemaInitialized() {
@@ -40,11 +45,20 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
     }
 
     @Override
+    public void validate(byte[] message) {
+        ensureSchemaInitialized();
+
+        schema.validate(message);
+    }
+
+    @Override
     public byte[] encode(byte[] message) {
         ensureSchemaInitialized();
 
-        // verify if the message can be decoded by the underlying schema
-        schema.decode(message);
+        if (requireSchemaValidation) {
+            // verify if the message can be decoded by the underlying schema
+            schema.validate(message);
+        }
 
         return message;
     }
@@ -53,8 +67,10 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
     public byte[] decode(byte[] bytes) {
         ensureSchemaInitialized();
 
-        // verify the message can be detected by the underlying schema
-        schema.decode(bytes);
+        if (requireSchemaValidation) {
+            // verify the message can be detected by the underlying schema
+            schema.decode(bytes);
+        }
 
         return bytes;
     }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index da82216..d4635f2 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -38,6 +38,12 @@ public class ByteSchema implements Schema<Byte> {
         .setType(SchemaType.INT8)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 1) {
+            throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
+        }
+    }
 
     @Override
     public byte[] encode(Byte message) {
@@ -53,9 +59,7 @@ public class ByteSchema implements Schema<Byte> {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 1) {
-            throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
-        }
+        validate(bytes);
         return bytes[0];
     }
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index 8ffd9d3..e617efb 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -39,6 +39,13 @@ public class DoubleSchema implements Schema<Double> {
         .setSchema(new byte[0]);
 
     @Override
+    public void validate(byte[] message) {
+        if (message.length != 8) {
+            throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
+        }
+    }
+
+    @Override
     public byte[] encode(Double message) {
         if (null == message) {
             return null;
@@ -62,9 +69,7 @@ public class DoubleSchema implements Schema<Double> {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 8) {
-            throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
-        }
+        validate(bytes);
         long value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index b7c61fb..32ac469 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -39,6 +39,13 @@ public class FloatSchema implements Schema<Float> {
         .setSchema(new byte[0]);
 
     @Override
+    public void validate(byte[] message) {
+        if (message.length != 4) {
+            throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
+        }
+    }
+
+    @Override
     public byte[] encode(Float message) {
         if (null == message) {
             return null;
@@ -58,9 +65,7 @@ public class FloatSchema implements Schema<Float> {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 4) {
-            throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
-        }
+        validate(bytes);
         int value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index 33bd73b..90092a4 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -39,6 +39,13 @@ public class IntSchema implements Schema<Integer> {
         .setSchema(new byte[0]);
 
     @Override
+    public void validate(byte[] message) {
+        if (message.length != 4) {
+            throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
+        }
+    }
+
+    @Override
     public byte[] encode(Integer message) {
         if (null == message) {
             return null;
@@ -57,9 +64,7 @@ public class IntSchema implements Schema<Integer> {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 4) {
-            throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
-        }
+        validate(bytes);
         int value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index e82a901..b252279 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -39,6 +39,13 @@ public class LongSchema implements Schema<Long> {
         .setSchema(new byte[0]);
 
     @Override
+    public void validate(byte[] message) {
+        if (message.length != 8) {
+            throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
+        }
+    }
+
+    @Override
     public byte[] encode(Long data) {
         if (null == data) {
             return null;
@@ -61,9 +68,7 @@ public class LongSchema implements Schema<Long> {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 8) {
-            throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
-        }
+        validate(bytes);
         long value = 0L;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index fc73b89..f1ec133 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -39,6 +39,13 @@ public class ShortSchema implements Schema<Short> {
         .setSchema(new byte[0]);
 
     @Override
+    public void validate(byte[] message) {
+        if (message.length != 2) {
+            throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
+        }
+    }
+
+    @Override
     public byte[] encode(Short message) {
         if (null == message) {
             return null;
@@ -55,9 +62,7 @@ public class ShortSchema implements Schema<Short> {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 2) {
-            throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
-        }
+        validate(bytes);
         short value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index eb44188..74e09e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,13 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
 /**
  * Class that provides a client interface to Pulsar.
@@ -87,7 +84,6 @@ public interface PulsarClient extends Closeable {
      * Producer producer = client.newProducer().topic(myTopic).create();
      * </code>
      *
-     *
      * @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance
      *
      * @since 2.0.0
@@ -124,9 +120,11 @@ public interface PulsarClient extends Closeable {
     /**
      * Create a consumer with default for subscribing on a specific topic
      *
+     * Since 2.2, if you are creating a consumer with non-bytes schema on a non-existence topic, it will
+     * automatically create the topic with the provided schema.
+     *
      * @param schema
      *          provide a way to convert between serialized data and domain objects
-     *
      * @return a {@link ConsumerBuilder} object to configure and construct the {@link Consumer} instance
      *
      * @since 2.0.0
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 1e57f73..c31773f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -170,7 +171,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
 
         Consumer<byte[]> consumer = proxyClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = proxyClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 85a1f84..cd38628 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -247,13 +248,13 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
 
 		// Step 3: Pass correct client params
 		PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
-		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
 		// Sleep for 4 seconds - wait for proxy auth params to expire
 		Thread.sleep(4 * 1000);
-		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
 		// Sleep for 3 seconds - wait for client auth parans to expire
 		Thread.sleep(3 * 1000);
-		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
 		proxyClient.close();
 		proxyService.close();
 	}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 0923ded..3e28242 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         LOG.info("Creating producer 1");
         PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer1 = client1.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+        Producer<byte[]> producer1 = client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
 
         LOG.info("Creating producer 2");
         PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
@@ -79,7 +80,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         Producer<byte[]> producer2;
         Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
         try {
-            producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+            producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
             Assert.fail("Should have failed since max num of connections is 2 and the first producer used them all up - one for discovery and other for producing.");
         } catch (Exception ex) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 09912ec..35dfb94 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -73,11 +74,11 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
                 .connectionsPerBroker(5).ioThreads(5).build();
         assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
         assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
-        Producer<byte[]> producer1 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+        Producer<byte[]> producer1 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                 .create();
         assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
         try {
-            Producer<byte[]> producer2 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+            Producer<byte[]> producer2 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                     .create();
             Assert.fail("Should have failed since can't acquire LookupRequestSemaphore");
         } catch (Exception ex) {
@@ -86,7 +87,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d);
         proxyService.getLookupRequestSemaphore().release();
         try {
-            Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+            Producer<byte[]> producer3 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                     .create();
         } catch (Exception ex) {
             Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 391a5bf..99d79fb 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.mockito.Mockito;
@@ -80,7 +81,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
     public void testProducer() throws Exception {
         PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                 .create();
 
         for (int i = 0; i < 10; i++) {
@@ -94,7 +95,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
     public void testProducerConsumer() throws Exception {
         PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://sample/test/local/producer-consumer-topic")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -128,7 +129,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
                 .build();
         admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://sample/test/local/partitioned-topic")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
@@ -177,7 +178,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
 
             final int numMessages = 20;
 
-            try (Producer<byte[]> producer = client.newProducer()
+            try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
                 .topic("persistent://sample/test/local/topic1")
                 .create()) {
                 for (int i = 0; i < numMessages; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 116814b..db5070f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.mockito.Mockito;
@@ -84,7 +85,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
         PulsarClient client = PulsarClient.builder()
                 .serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls()).enableTls(true)
                 .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build();
-        Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/topic").create();
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/topic").create();
 
         for (int i = 0; i < 10; i++) {
             producer.send("test".getBytes());
@@ -101,7 +102,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
         admin.tenants().createTenant("sample", new TenantInfo());
         admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
 
-        Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/partitioned-topic")
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/partitioned-topic")
                 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // Create a consumer directly attached to broker
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 9ec8ced..5ee0dac 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -186,13 +187,13 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
         }
         Producer<byte[]> producer;
         try {
-            producer = proxyClient.newProducer()
+            producer = proxyClient.newProducer(Schema.BYTES)
                     .topic("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1").create();
         } catch (Exception ex) {
             // expected
             admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
                     Sets.newHashSet(AuthAction.produce, AuthAction.consume));
-            producer = proxyClient.newProducer()
+            producer = proxyClient.newProducer(Schema.BYTES)
                     .topic("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1").create();
         }
         final int msgs = 10;
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 0138de6..d894b08 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -241,7 +242,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
                 .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
 
-        Producer<byte[]> producer = proxyClient.newProducer()
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
                 .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 050aeec..4909a89 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -164,7 +165,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
         Consumer<byte[]> consumer = proxyClient.newConsumer()
                 .topic("persistent://my-property/without-service-discovery/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = proxyClient.newProducer()
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
                 .topic("persistent://my-property/without-service-discovery/my-ns/my-topic1").create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index adde9f2..653c076 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -74,12 +74,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-kafka</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>com.datastax.cassandra</groupId>
       <artifactId>cassandra-driver-core</artifactId>
       <exclusions>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index 0f17aac..3dd8940 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.testng.annotations.Test;
 
+@Test(enabled = false)
 @Slf4j
 public class KafkaApiTest extends PulsarTestSuite {