You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/07 23:42:20 UTC

[GitHub] sijie closed pull request #2730: [schema] provide a flag to disable/enable schema validation on broker

sijie closed pull request #2730: [schema] provide a flag to disable/enable schema validation on broker
URL: https://github.com/apache/pulsar/pull/2730
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index f4a57b6b03..867dcb3de5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -517,6 +517,13 @@ exposePublisherStats=true
 # The schema storage implementation used by this broker
 schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
 
+# Enforce schema validation on following cases:
+#
+# - if a producer without a schema attempts to produce to a topic with schema, the producer will be
+#   failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.
+#   if you enable this setting, it will cause non-java clients failed to produce.
+isSchemaValidationEnforced=false
+
 ### --- Ledger Offloading --- ###
 
 # The directory for all the offloader implementations
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e436ff43ae..3ddd08c69d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -472,6 +472,7 @@
     // Interval between checks to see if topics with compaction policies need to be compacted
     private int brokerServiceCompactionMonitorIntervalInSeconds = 60;
 
+    private boolean isSchemaValidationEnforced = false;
     private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
     private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
             "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
@@ -1658,6 +1659,14 @@ public void setExposeConsumerLevelMetricsInPrometheus(boolean exposeConsumerLeve
         this.exposeConsumerLevelMetricsInPrometheus = exposeConsumerLevelMetricsInPrometheus;
     }
 
+    public boolean isSchemaValidationEnforced() {
+        return isSchemaValidationEnforced;
+    }
+
+    public void setSchemaValidationEnforced(boolean enforced) {
+        this.isSchemaValidationEnforced = enforced;
+    }
+
     public String getSchemaRegistryStorageClassName() {
        return schemaRegistryStorageClassName;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 500ee97427..483bf66451 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -129,6 +129,7 @@
     private String originalPrincipal = null;
     private Set<String> proxyRoles;
     private boolean authenticateOriginalAuthData;
+    private final boolean schemaValidationEnforced;
 
     enum State {
         Start, Connected, Failed
@@ -148,6 +149,7 @@ public ServerCnx(PulsarService pulsar) {
                 .getMaxConcurrentNonPersistentMessagePerConnection();
         this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
         this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData();
+        this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
     }
 
     @Override
@@ -833,7 +835,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                             } else {
                                 schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
                                         CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
-                                        if (hasSchema) {
+                                        if (hasSchema && schemaValidationEnforced) {
                                             result.completeExceptionally(new IncompatibleSchemaException(
                                                 "Producers cannot connect without a schema to topics with a schema"));
                                         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 187539fa90..5c18dfe2fb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -80,6 +80,7 @@
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -648,7 +649,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
         assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
 
         // Force topic creation and namespace being loaded
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1/my-topic")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -685,7 +686,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
         }
 
         // Force topic creation and namespace being loaded
-        producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
+        producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
         producer.close();
         admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
 
@@ -836,7 +837,7 @@ public void partitionedTopics(String topicName) throws Exception {
         admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
         assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -894,7 +895,7 @@ public void partitionedTopics(String topicName) throws Exception {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer()
+        producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -957,7 +958,7 @@ public void testNamespaceSplitBundle() throws Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -987,7 +988,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1084,7 +1085,7 @@ public void testNamespaceUnloadBundle() throws Exception {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1146,7 +1147,7 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1203,7 +1204,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1216,7 +1217,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = pulsarClient.newProducer()
+        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1-bundles/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1314,7 +1315,7 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1363,7 +1364,7 @@ public void statsOnNonExistingTopics() throws Exception {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/my-topic";
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1755,7 +1756,7 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
         Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/ns1/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -1949,7 +1950,7 @@ public void testTriggerCompaction() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         // mock actual compaction, we don't need to really run it
@@ -1983,7 +1984,7 @@ public void testCompactionStatus() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index d2fb996a19..803f8af028 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -70,6 +70,7 @@
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -641,7 +642,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
         assertEquals(admin.namespaces().getPersistence("prop-xyz/use/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
 
         // Force topic creation and namespace being loaded
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/my-topic").create();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns1/my-topic").create();
         producer.close();
         admin.topics().delete("persistent://prop-xyz/use/ns1/my-topic");
 
@@ -674,7 +675,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
         }
 
         // Force topic creation and namespace being loaded
-        producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
+        producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
         producer.close();
         admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
 
@@ -806,7 +807,7 @@ public void partitionedTopics(String topicName) throws Exception {
         admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
         assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -864,7 +865,7 @@ public void partitionedTopics(String topicName) throws Exception {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer()
+        producer = client.newProducer(Schema.BYTES)
             .topic(partitionedTopicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -926,7 +927,7 @@ public void testNamespaceSplitBundle() throws Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -956,7 +957,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1080,7 +1081,7 @@ public void testNamespaceUnloadBundle() throws Exception {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1141,7 +1142,7 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1197,7 +1198,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1210,7 +1211,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = pulsarClient.newProducer()
+        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1-bundles/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1307,7 +1308,7 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1356,7 +1357,7 @@ public void statsOnNonExistingTopics() throws Exception {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/use/ns1/my-topic";
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1748,7 +1749,7 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
         Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://prop-xyz/use/ns1/ds1")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -1939,7 +1940,7 @@ public void testTriggerCompaction() throws Exception {
         String topicName = "persistent://prop-xyz/use/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         // mock actual compaction, we don't need to really run it
@@ -1973,7 +1974,7 @@ public void testCompactionStatus() throws Exception {
         String topicName = "persistent://prop-xyz/use/ns1/topic1";
 
         // create a topic by creating a producer
-        pulsarClient.newProducer().topic(topicName).create().close();
+        pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 5c64ac8038..aa6dd7150c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Message;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 6a7953cb47..8a583217ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -176,8 +176,9 @@ public void testMultipleBrokerLookup() throws Exception {
         // load namespace-bundle by calling Broker2
         Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic("persistent://my-property/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -263,8 +264,9 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception {
         // load namespace-bundle by calling Broker2
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+            .topic("persistent://my-property2/use2/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -336,7 +338,8 @@ public void testPartitionTopicLookup() throws Exception {
         loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
         /**** broker-2 started ****/
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic(topicName.toString())
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
@@ -489,8 +492,9 @@ public void testDiscoveryLookup() throws Exception {
         PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).build();
         Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+            .topic("persistent://my-property2/use2/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -560,8 +564,9 @@ public void testDiscoveryLookupTls() throws Exception {
                 .enableTls(true).allowTlsInsecureConnection(true).build();
         Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+            .topic("persistent://my-property2/use2/my-ns/my-topic1")
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -635,8 +640,9 @@ public void start() throws PulsarClientException {
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).build();
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use2/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use2/my-ns/my-topic1")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+            .topic("persistent://my-property/use2/my-ns/my-topic1")
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index e6de6bf62d..7d524a9f84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -20,17 +20,34 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 public class SimpleSchemaTest extends ProducerConsumerBase {
 
+    @DataProvider(name = "schemaValidationModes")
+    public static Object[][] schemaValidationModes() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    private final boolean schemaValidationEnforced;
+
+    @Factory(dataProvider = "schemaValidationModes")
+    public SimpleSchemaTest(boolean schemaValidationEnforced) {
+        this.schemaValidationEnforced = schemaValidationEnforced;
+    }
+
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        conf.setSchemaValidationEnforced(schemaValidationEnforced);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -130,10 +147,27 @@ public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
             p.send(new V1Data(0));
         }
 
-        try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create()) {
-            Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema");
+        try (Producer<byte[]> p = pulsarClient.newProducer(Schema.BYTES).topic(topic).create()) {
+            if (!schemaValidationEnforced) {
+                p.send("junkdata".getBytes(UTF_8));
+            } else {
+                Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema"
+                    + " if SchemaValidationEnabled is enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (schemaValidationEnforced) {
+                Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+            } else {
+                Assert.fail("Shouldn't throw IncompatibleSchemaException"
+                    + " if SchemaValidationEnforced is disabled");
+            }
+        }
+
+        // if using AUTO_PRODUCE_BYTES, producer can connect but the publish will fail
+        try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create()) {
+            p.send("junkdata".getBytes(UTF_8));
         } catch (PulsarClientException e) {
-            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+            assertTrue(e.getCause() instanceof SchemaSerializationException);
         }
     }
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 5432d43108..5872db0264 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -40,6 +40,22 @@
  */
 public interface Schema<T> {
 
+    /**
+     * Check if the message is a valid object for this schema.
+     *
+     * <p>The implementation can choose what its most efficient approach to validate the schema.
+     * If the implementation doesn't provide it, it will attempt to use {@link #decode(byte[])}
+     * to see if this schema can decode this message or not as a validation mechanism to verify
+     * the bytes.
+     *
+     * @param message the messages to verify
+     * @return true if it is a valid message
+     * @throws SchemaSerializationException if it is not a valid message
+     */
+    default void validate(byte[] message) {
+        decode(message);
+    }
+
     /**
      * Encode an object representing the message content into a byte array.
      *
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index dc90931d4f..e795c7078d 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -40,6 +40,13 @@ private void ensureSchemaInitialized() {
         checkState(null != schema, "Schema is not initialized before used");
     }
 
+    @Override
+    public void validate(byte[] message) {
+        ensureSchemaInitialized();
+
+        schema.validate(message);
+    }
+
     @Override
     public byte[] encode(GenericRecord message) {
         ensureSchemaInitialized();
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 36520a9ee7..517d53a2fe 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -23,28 +23,42 @@
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * Auto detect schema.
  */
 public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
+    private boolean requireSchemaValidation = true;
     private Schema<T> schema;
 
     public void setSchema(Schema<T> schema) {
         this.schema = schema;
+        this.requireSchemaValidation = schema.getSchemaInfo() != null
+            && SchemaType.BYTES != schema.getSchemaInfo().getType()
+            && SchemaType.NONE != schema.getSchemaInfo().getType();
     }
 
     private void ensureSchemaInitialized() {
         checkState(null != schema, "Schema is not initialized before used");
     }
 
+    @Override
+    public void validate(byte[] message) {
+        ensureSchemaInitialized();
+
+        schema.validate(message);
+    }
+
     @Override
     public byte[] encode(byte[] message) {
         ensureSchemaInitialized();
 
-        // verify if the message can be decoded by the underlying schema
-        schema.decode(message);
+        if (requireSchemaValidation) {
+            // verify if the message can be decoded by the underlying schema
+            schema.validate(message);
+        }
 
         return message;
     }
@@ -53,8 +67,10 @@ private void ensureSchemaInitialized() {
     public byte[] decode(byte[] bytes) {
         ensureSchemaInitialized();
 
-        // verify the message can be detected by the underlying schema
-        schema.decode(bytes);
+        if (requireSchemaValidation) {
+            // verify the message can be detected by the underlying schema
+            schema.decode(bytes);
+        }
 
         return bytes;
     }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index da82216df8..d4635f277b 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -38,6 +38,12 @@ public static ByteSchema of() {
         .setType(SchemaType.INT8)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 1) {
+            throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
+        }
+    }
 
     @Override
     public byte[] encode(Byte message) {
@@ -53,9 +59,7 @@ public Byte decode(byte[] bytes) {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 1) {
-            throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
-        }
+        validate(bytes);
         return bytes[0];
     }
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index 8ffd9d3ac5..e617efba92 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -38,6 +38,13 @@ public static DoubleSchema of() {
         .setType(SchemaType.DOUBLE)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 8) {
+            throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
+        }
+    }
+
     @Override
     public byte[] encode(Double message) {
         if (null == message) {
@@ -62,9 +69,7 @@ public Double decode(byte[] bytes) {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 8) {
-            throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
-        }
+        validate(bytes);
         long value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index b7c61fb9c1..32ac469fb5 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -38,6 +38,13 @@ public static FloatSchema of() {
         .setType(SchemaType.FLOAT)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 4) {
+            throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
+        }
+    }
+
     @Override
     public byte[] encode(Float message) {
         if (null == message) {
@@ -58,9 +65,7 @@ public Float decode(byte[] bytes) {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 4) {
-            throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
-        }
+        validate(bytes);
         int value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index 33bd73b65b..90092a4faf 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -38,6 +38,13 @@ public static IntSchema of() {
         .setType(SchemaType.INT32)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 4) {
+            throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
+        }
+    }
+
     @Override
     public byte[] encode(Integer message) {
         if (null == message) {
@@ -57,9 +64,7 @@ public Integer decode(byte[] bytes) {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 4) {
-            throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
-        }
+        validate(bytes);
         int value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index e82a901bb7..b252279115 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -38,6 +38,13 @@ public static LongSchema of() {
         .setType(SchemaType.INT64)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 8) {
+            throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
+        }
+    }
+
     @Override
     public byte[] encode(Long data) {
         if (null == data) {
@@ -61,9 +68,7 @@ public Long decode(byte[] bytes) {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 8) {
-            throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
-        }
+        validate(bytes);
         long value = 0L;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index fc73b89470..f1ec1339af 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -38,6 +38,13 @@ public static ShortSchema of() {
         .setType(SchemaType.INT16)
         .setSchema(new byte[0]);
 
+    @Override
+    public void validate(byte[] message) {
+        if (message.length != 2) {
+            throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
+        }
+    }
+
     @Override
     public byte[] encode(Short message) {
         if (null == message) {
@@ -55,9 +62,7 @@ public Short decode(byte[] bytes) {
         if (null == bytes) {
             return null;
         }
-        if (bytes.length != 2) {
-            throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
-        }
+        validate(bytes);
         short value = 0;
         for (byte b : bytes) {
             value <<= 8;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index eb44188acd..74e09e735a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,13 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
 /**
  * Class that provides a client interface to Pulsar.
@@ -87,7 +84,6 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      * Producer producer = client.newProducer().topic(myTopic).create();
      * </code>
      *
-     *
      * @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance
      *
      * @since 2.0.0
@@ -124,9 +120,11 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
     /**
      * Create a consumer with default for subscribing on a specific topic
      *
+     * Since 2.2, if you are creating a consumer with non-bytes schema on a non-existence topic, it will
+     * automatically create the topic with the provided schema.
+     *
      * @param schema
      *          provide a way to convert between serialized data and domain objects
-     *
      * @return a {@link ConsumerBuilder} object to configure and construct the {@link Consumer} instance
      *
      * @since 2.0.0
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 1e57f73686..c31773fcb0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -36,6 +36,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -170,7 +171,7 @@ public void testTlsSyncProducerAndConsumer() throws Exception {
 
         Consumer<byte[]> consumer = proxyClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = proxyClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 85a1f8463a..cd3862877e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -247,13 +248,13 @@ void testAuthentication() throws Exception {
 
 		// Step 3: Pass correct client params
 		PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
-		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
 		// Sleep for 4 seconds - wait for proxy auth params to expire
 		Thread.sleep(4 * 1000);
-		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
 		// Sleep for 3 seconds - wait for client auth parans to expire
 		Thread.sleep(3 * 1000);
-		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
 		proxyClient.close();
 		proxyService.close();
 	}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 0923ded499..3e28242092 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public void testInboundConnection() throws Exception {
         LOG.info("Creating producer 1");
         PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer1 = client1.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+        Producer<byte[]> producer1 = client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
 
         LOG.info("Creating producer 2");
         PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
@@ -79,7 +80,7 @@ public void testInboundConnection() throws Exception {
         Producer<byte[]> producer2;
         Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
         try {
-            producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+            producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
             Assert.fail("Should have failed since max num of connections is 2 and the first producer used them all up - one for discovery and other for producing.");
         } catch (Exception ex) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 09912ec4f1..35dfb94374 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -73,11 +74,11 @@ public void testLookup() throws Exception {
                 .connectionsPerBroker(5).ioThreads(5).build();
         assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
         assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
-        Producer<byte[]> producer1 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+        Producer<byte[]> producer1 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                 .create();
         assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
         try {
-            Producer<byte[]> producer2 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+            Producer<byte[]> producer2 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                     .create();
             Assert.fail("Should have failed since can't acquire LookupRequestSemaphore");
         } catch (Exception ex) {
@@ -86,7 +87,7 @@ public void testLookup() throws Exception {
         Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d);
         proxyService.getLookupRequestSemaphore().release();
         try {
-            Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+            Producer<byte[]> producer3 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                     .create();
         } catch (Exception ex) {
             Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 391a5bfb43..99d79fb639 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -34,6 +34,7 @@
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.mockito.Mockito;
@@ -80,7 +81,7 @@ protected void cleanup() throws Exception {
     public void testProducer() throws Exception {
         PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
                 .create();
 
         for (int i = 0; i < 10; i++) {
@@ -94,7 +95,7 @@ public void testProducer() throws Exception {
     public void testProducerConsumer() throws Exception {
         PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://sample/test/local/producer-consumer-topic")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -128,7 +129,7 @@ public void testPartitions() throws Exception {
                 .build();
         admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
 
-        Producer<byte[]> producer = client.newProducer()
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
             .topic("persistent://sample/test/local/partitioned-topic")
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
@@ -177,7 +178,7 @@ public void testRegexSubscription() throws Exception {
 
             final int numMessages = 20;
 
-            try (Producer<byte[]> producer = client.newProducer()
+            try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
                 .topic("persistent://sample/test/local/topic1")
                 .create()) {
                 for (int i = 0; i < numMessages; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 116814bf66..db5070fd32 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.mockito.Mockito;
@@ -84,7 +85,7 @@ public void testProducer() throws Exception {
         PulsarClient client = PulsarClient.builder()
                 .serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls()).enableTls(true)
                 .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build();
-        Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/topic").create();
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/topic").create();
 
         for (int i = 0; i < 10; i++) {
             producer.send("test".getBytes());
@@ -101,7 +102,7 @@ public void testPartitions() throws Exception {
         admin.tenants().createTenant("sample", new TenantInfo());
         admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
 
-        Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/partitioned-topic")
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/partitioned-topic")
                 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // Create a consumer directly attached to broker
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 9ec8ced961..5ee0dac24e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -36,6 +36,7 @@
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -186,13 +187,13 @@ public void testProxyAuthorization() throws Exception {
         }
         Producer<byte[]> producer;
         try {
-            producer = proxyClient.newProducer()
+            producer = proxyClient.newProducer(Schema.BYTES)
                     .topic("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1").create();
         } catch (Exception ex) {
             // expected
             admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
                     Sets.newHashSet(AuthAction.produce, AuthAction.consume));
-            producer = proxyClient.newProducer()
+            producer = proxyClient.newProducer(Schema.BYTES)
                     .topic("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1").create();
         }
         final int msgs = 10;
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 0138de6e3e..d894b0833f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -38,6 +38,7 @@
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -241,7 +242,7 @@ public void testProxyAuthorization() throws Exception {
                 .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
 
-        Producer<byte[]> producer = proxyClient.newProducer()
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
                 .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 050aeec89d..4909a89d65 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -164,7 +165,7 @@ public void testDiscoveryService() throws Exception {
         Consumer<byte[]> consumer = proxyClient.newConsumer()
                 .topic("persistent://my-property/without-service-discovery/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = proxyClient.newProducer()
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
                 .topic("persistent://my-property/without-service-discovery/my-ns/my-topic1").create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index adde9f2798..653c076dc5 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -73,12 +73,6 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-kafka</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>com.datastax.cassandra</groupId>
       <artifactId>cassandra-driver-core</artifactId>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index 0f17aac437..3dd8940416 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -54,6 +54,7 @@
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.testng.annotations.Test;
 
+@Test(enabled = false)
 @Slf4j
 public class KafkaApiTest extends PulsarTestSuite {
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services