You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/07 23:42:23 UTC
[pulsar] branch master updated: [schema] provide a flag to
disable/enable schema validation on broker (#2730)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0da4e4a [schema] provide a flag to disable/enable schema validation on broker (#2730)
0da4e4a is described below
commit 0da4e4af407650de874ed6c9de891d98aa540aad
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Sun Oct 7 16:42:18 2018 -0700
[schema] provide a flag to disable/enable schema validation on broker (#2730)
*Motivation*
We need an upgrade/backward compatibility story for schema enforcement. Especially around:
- `Producers cannot connect without a schema to topics with a schema`
*Changes*
- provide a flag on brokers to enable schema validation (and disabled it by default). this allows a smooth upgrade on brokers,
otherwise, it will break all non-java producers on topics with schema immediately when upgrade to the new version
---
conf/broker.conf | 7 ++++
.../apache/pulsar/broker/ServiceConfiguration.java | 9 +++++
.../apache/pulsar/broker/service/ServerCnx.java | 4 ++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 31 +++++++++--------
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 31 +++++++++--------
.../broker/auth/MockedPulsarServiceBaseTest.java | 1 +
.../pulsar/client/api/BrokerServiceLookupTest.java | 28 +++++++++------
.../apache/pulsar/client/api/SimpleSchemaTest.java | 40 ++++++++++++++++++++--
.../java/org/apache/pulsar/client/api/Schema.java | 16 +++++++++
.../client/impl/schema/AutoConsumeSchema.java | 7 ++++
.../client/impl/schema/AutoProduceBytesSchema.java | 24 ++++++++++---
.../pulsar/client/impl/schema/ByteSchema.java | 10 ++++--
.../pulsar/client/impl/schema/DoubleSchema.java | 11 ++++--
.../pulsar/client/impl/schema/FloatSchema.java | 11 ++++--
.../pulsar/client/impl/schema/IntSchema.java | 11 ++++--
.../pulsar/client/impl/schema/LongSchema.java | 11 ++++--
.../pulsar/client/impl/schema/ShortSchema.java | 11 ++++--
.../org/apache/pulsar/client/api/PulsarClient.java | 8 ++---
.../ProxyAuthenticatedProducerConsumerTest.java | 3 +-
.../proxy/server/ProxyAuthenticationTest.java | 7 ++--
.../server/ProxyConnectionThrottlingTest.java | 5 +--
.../proxy/server/ProxyLookupThrottlingTest.java | 7 ++--
.../org/apache/pulsar/proxy/server/ProxyTest.java | 9 ++---
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 5 +--
.../server/ProxyWithAuthorizationNegTest.java | 5 +--
.../proxy/server/ProxyWithAuthorizationTest.java | 3 +-
.../server/ProxyWithoutServiceDiscoveryTest.java | 3 +-
tests/integration/pom.xml | 6 ----
.../integration/compat/kafka/KafkaApiTest.java | 1 +
29 files changed, 228 insertions(+), 97 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index f4a57b6..867dcb3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -517,6 +517,13 @@ exposePublisherStats=true
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
+# Enforce schema validation on following cases:
+#
+# - if a producer without a schema attempts to produce to a topic with schema, the producer will be
+# failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.
+# if you enable this setting, it will cause non-java clients failed to produce.
+isSchemaValidationEnforced=false
+
### --- Ledger Offloading --- ###
# The directory for all the offloader implementations
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e436ff4..3ddd08c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -472,6 +472,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Interval between checks to see if topics with compaction policies need to be compacted
private int brokerServiceCompactionMonitorIntervalInSeconds = 60;
+ private boolean isSchemaValidationEnforced = false;
private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
@@ -1658,6 +1659,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.exposeConsumerLevelMetricsInPrometheus = exposeConsumerLevelMetricsInPrometheus;
}
+ public boolean isSchemaValidationEnforced() {
+ return isSchemaValidationEnforced;
+ }
+
+ public void setSchemaValidationEnforced(boolean enforced) {
+ this.isSchemaValidationEnforced = enforced;
+ }
+
public String getSchemaRegistryStorageClassName() {
return schemaRegistryStorageClassName;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 500ee97..483bf66 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -129,6 +129,7 @@ public class ServerCnx extends PulsarHandler {
private String originalPrincipal = null;
private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;
+ private final boolean schemaValidationEnforced;
enum State {
Start, Connected, Failed
@@ -148,6 +149,7 @@ public class ServerCnx extends PulsarHandler {
.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData();
+ this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
}
@Override
@@ -833,7 +835,7 @@ public class ServerCnx extends PulsarHandler {
} else {
schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
- if (hasSchema) {
+ if (hasSchema && schemaValidationEnforced) {
result.completeExceptionally(new IncompatibleSchemaException(
"Producers cannot connect without a schema to topics with a schema"));
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 187539f..5c18dfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -648,7 +649,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
// Force topic creation and namespace being loaded
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/my-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -685,7 +686,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
}
// Force topic creation and namespace being loaded
- producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
+ producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
@@ -836,7 +837,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
- Producer<byte[]> producer = client.newProducer()
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -894,7 +895,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
} catch (ConflictException ce) {
}
- producer = client.newProducer()
+ producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -957,7 +958,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -987,7 +988,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1084,7 +1085,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Lists.newArrayList("my-sub"));
// Create producer
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1146,7 +1147,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Lists.newArrayList("my-sub"));
// Create producer
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1203,7 +1204,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
.subscribe();
// Create producer
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1216,7 +1217,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
producer.close();
// Create producer
- Producer<byte[]> producer1 = pulsarClient.newProducer()
+ Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1314,7 +1315,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
}
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1363,7 +1364,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteFailedReturnCode() throws Exception {
String topicName = "persistent://prop-xyz/ns1/my-topic";
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1755,7 +1756,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
.subscriptionName("my-sub").subscribe();
- Producer<byte[]> producer = client.newProducer()
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -1949,7 +1950,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
String topicName = "persistent://prop-xyz/ns1/topic1";
// create a topic by creating a producer
- pulsarClient.newProducer().topic(topicName).create().close();
+ pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// mock actual compaction, we don't need to really run it
@@ -1983,7 +1984,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
String topicName = "persistent://prop-xyz/ns1/topic1";
// create a topic by creating a producer
- pulsarClient.newProducer().topic(topicName).create().close();
+ pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
assertEquals(admin.topics().compactionStatus(topicName).status,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index d2fb996..803f8af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -641,7 +642,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.namespaces().getPersistence("prop-xyz/use/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
// Force topic creation and namespace being loaded
- Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/my-topic").create();
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns1/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/use/ns1/my-topic");
@@ -674,7 +675,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
}
// Force topic creation and namespace being loaded
- producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
+ producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
@@ -806,7 +807,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
- Producer<byte[]> producer = client.newProducer()
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -864,7 +865,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
} catch (ConflictException ce) {
}
- producer = client.newProducer()
+ producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -926,7 +927,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
// Force to create a topic
final String namespace = "prop-xyz/use/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -956,7 +957,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
// Force to create a topic
final String namespace = "prop-xyz/use/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1080,7 +1081,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
Lists.newArrayList("my-sub"));
// Create producer
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1141,7 +1142,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
Lists.newArrayList("my-sub"));
// Create producer
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1197,7 +1198,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
.subscribe();
// Create producer
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1210,7 +1211,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
producer.close();
// Create producer
- Producer<byte[]> producer1 = pulsarClient.newProducer()
+ Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1-bundles/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1307,7 +1308,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
}
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1356,7 +1357,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteFailedReturnCode() throws Exception {
String topicName = "persistent://prop-xyz/use/ns1/my-topic";
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -1748,7 +1749,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1")
.subscriptionName("my-sub").subscribe();
- Producer<byte[]> producer = client.newProducer()
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
@@ -1939,7 +1940,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
String topicName = "persistent://prop-xyz/use/ns1/topic1";
// create a topic by creating a producer
- pulsarClient.newProducer().topic(topicName).create().close();
+ pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// mock actual compaction, we don't need to really run it
@@ -1973,7 +1974,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
String topicName = "persistent://prop-xyz/use/ns1/topic1";
// create a topic by creating a producer
- pulsarClient.newProducer().topic(topicName).create().close();
+ pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
assertEquals(admin.topics().compactionStatus(topicName).status,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 5c64ac80..aa6dd71 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Message;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 6a7953c..8a58321 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -176,8 +176,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// load namespace-bundle by calling Broker2
Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
- .create();
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic("persistent://my-property/my-ns/my-topic1")
+ .create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -263,8 +264,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// load namespace-bundle by calling Broker2
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
- .create();
+ Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+ .topic("persistent://my-property2/use2/my-ns/my-topic1")
+ .create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -336,7 +338,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
/**** broker-2 started ****/
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
@@ -489,8 +492,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).build();
Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
- .create();
+ Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+ .topic("persistent://my-property2/use2/my-ns/my-topic1")
+ .create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -560,8 +564,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
.enableTls(true).allowTlsInsecureConnection(true).build();
Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = pulsarClient2.newProducer().topic("persistent://my-property2/use2/my-ns/my-topic1")
- .create();
+ Producer<byte[]> producer = pulsarClient2.newProducer(Schema.BYTES)
+ .topic("persistent://my-property2/use2/my-ns/my-topic1")
+ .create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -635,8 +640,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).build();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use2/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use2/my-ns/my-topic1")
- .create();
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic("persistent://my-property/use2/my-ns/my-topic1")
+ .create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index e6de6bf..7d524a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -20,17 +20,34 @@ package org.apache.pulsar.client.api;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
import org.testng.annotations.Test;
public class SimpleSchemaTest extends ProducerConsumerBase {
+ @DataProvider(name = "schemaValidationModes")
+ public static Object[][] schemaValidationModes() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ private final boolean schemaValidationEnforced;
+
+ @Factory(dataProvider = "schemaValidationModes")
+ public SimpleSchemaTest(boolean schemaValidationEnforced) {
+ this.schemaValidationEnforced = schemaValidationEnforced;
+ }
+
+
@BeforeMethod
@Override
protected void setup() throws Exception {
+ conf.setSchemaValidationEnforced(schemaValidationEnforced);
super.internalSetup();
super.producerBaseSetup();
}
@@ -130,10 +147,27 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
p.send(new V1Data(0));
}
- try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create()) {
- Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema");
+ try (Producer<byte[]> p = pulsarClient.newProducer(Schema.BYTES).topic(topic).create()) {
+ if (!schemaValidationEnforced) {
+ p.send("junkdata".getBytes(UTF_8));
+ } else {
+ Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema"
+ + " if SchemaValidationEnabled is enabled");
+ }
+ } catch (PulsarClientException e) {
+ if (schemaValidationEnforced) {
+ Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+ } else {
+ Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ + " if SchemaValidationEnforced is disabled");
+ }
+ }
+
+ // if using AUTO_PRODUCE_BYTES, producer can connect but the publish will fail
+ try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create()) {
+ p.send("junkdata".getBytes(UTF_8));
} catch (PulsarClientException e) {
- Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+ assertTrue(e.getCause() instanceof SchemaSerializationException);
}
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 5432d43..5872db0 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -41,6 +41,22 @@ import org.apache.pulsar.common.schema.SchemaInfo;
public interface Schema<T> {
/**
+ * Check if the message is a valid object for this schema.
+ *
+ * <p>The implementation can choose what its most efficient approach to validate the schema.
+ * If the implementation doesn't provide it, it will attempt to use {@link #decode(byte[])}
+ * to see if this schema can decode this message or not as a validation mechanism to verify
+ * the bytes.
+ *
+ * @param message the messages to verify
+ * @return true if it is a valid message
+ * @throws SchemaSerializationException if it is not a valid message
+ */
+ default void validate(byte[] message) {
+ decode(message);
+ }
+
+ /**
* Encode an object representing the message content into a byte array.
*
* @param message
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index dc90931..e795c70 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -41,6 +41,13 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
@Override
+ public void validate(byte[] message) {
+ ensureSchemaInitialized();
+
+ schema.validate(message);
+ }
+
+ @Override
public byte[] encode(GenericRecord message) {
ensureSchemaInitialized();
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 36520a9..517d53a 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -23,16 +23,21 @@ import static com.google.common.base.Preconditions.checkState;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
/**
* Auto detect schema.
*/
public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
+ private boolean requireSchemaValidation = true;
private Schema<T> schema;
public void setSchema(Schema<T> schema) {
this.schema = schema;
+ this.requireSchemaValidation = schema.getSchemaInfo() != null
+ && SchemaType.BYTES != schema.getSchemaInfo().getType()
+ && SchemaType.NONE != schema.getSchemaInfo().getType();
}
private void ensureSchemaInitialized() {
@@ -40,11 +45,20 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
}
@Override
+ public void validate(byte[] message) {
+ ensureSchemaInitialized();
+
+ schema.validate(message);
+ }
+
+ @Override
public byte[] encode(byte[] message) {
ensureSchemaInitialized();
- // verify if the message can be decoded by the underlying schema
- schema.decode(message);
+ if (requireSchemaValidation) {
+ // verify if the message can be decoded by the underlying schema
+ schema.validate(message);
+ }
return message;
}
@@ -53,8 +67,10 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
public byte[] decode(byte[] bytes) {
ensureSchemaInitialized();
- // verify the message can be detected by the underlying schema
- schema.decode(bytes);
+ if (requireSchemaValidation) {
+ // verify the message can be detected by the underlying schema
+ schema.decode(bytes);
+ }
return bytes;
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index da82216..d4635f2 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -38,6 +38,12 @@ public class ByteSchema implements Schema<Byte> {
.setType(SchemaType.INT8)
.setSchema(new byte[0]);
+ @Override
+ public void validate(byte[] message) {
+ if (message.length != 1) {
+ throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
+ }
+ }
@Override
public byte[] encode(Byte message) {
@@ -53,9 +59,7 @@ public class ByteSchema implements Schema<Byte> {
if (null == bytes) {
return null;
}
- if (bytes.length != 1) {
- throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
- }
+ validate(bytes);
return bytes[0];
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index 8ffd9d3..e617efb 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -39,6 +39,13 @@ public class DoubleSchema implements Schema<Double> {
.setSchema(new byte[0]);
@Override
+ public void validate(byte[] message) {
+ if (message.length != 8) {
+ throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
+ }
+ }
+
+ @Override
public byte[] encode(Double message) {
if (null == message) {
return null;
@@ -62,9 +69,7 @@ public class DoubleSchema implements Schema<Double> {
if (null == bytes) {
return null;
}
- if (bytes.length != 8) {
- throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
- }
+ validate(bytes);
long value = 0;
for (byte b : bytes) {
value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index b7c61fb..32ac469 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -39,6 +39,13 @@ public class FloatSchema implements Schema<Float> {
.setSchema(new byte[0]);
@Override
+ public void validate(byte[] message) {
+ if (message.length != 4) {
+ throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
+ }
+ }
+
+ @Override
public byte[] encode(Float message) {
if (null == message) {
return null;
@@ -58,9 +65,7 @@ public class FloatSchema implements Schema<Float> {
if (null == bytes) {
return null;
}
- if (bytes.length != 4) {
- throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
- }
+ validate(bytes);
int value = 0;
for (byte b : bytes) {
value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index 33bd73b..90092a4 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -39,6 +39,13 @@ public class IntSchema implements Schema<Integer> {
.setSchema(new byte[0]);
@Override
+ public void validate(byte[] message) {
+ if (message.length != 4) {
+ throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
+ }
+ }
+
+ @Override
public byte[] encode(Integer message) {
if (null == message) {
return null;
@@ -57,9 +64,7 @@ public class IntSchema implements Schema<Integer> {
if (null == bytes) {
return null;
}
- if (bytes.length != 4) {
- throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
- }
+ validate(bytes);
int value = 0;
for (byte b : bytes) {
value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index e82a901..b252279 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -39,6 +39,13 @@ public class LongSchema implements Schema<Long> {
.setSchema(new byte[0]);
@Override
+ public void validate(byte[] message) {
+ if (message.length != 8) {
+ throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
+ }
+ }
+
+ @Override
public byte[] encode(Long data) {
if (null == data) {
return null;
@@ -61,9 +68,7 @@ public class LongSchema implements Schema<Long> {
if (null == bytes) {
return null;
}
- if (bytes.length != 8) {
- throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
- }
+ validate(bytes);
long value = 0L;
for (byte b : bytes) {
value <<= 8;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index fc73b89..f1ec133 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -39,6 +39,13 @@ public class ShortSchema implements Schema<Short> {
.setSchema(new byte[0]);
@Override
+ public void validate(byte[] message) {
+ if (message.length != 2) {
+ throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
+ }
+ }
+
+ @Override
public byte[] encode(Short message) {
if (null == message) {
return null;
@@ -55,9 +62,7 @@ public class ShortSchema implements Schema<Short> {
if (null == bytes) {
return null;
}
- if (bytes.length != 2) {
- throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
- }
+ validate(bytes);
short value = 0;
for (byte b : bytes) {
value <<= 8;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index eb44188..74e09e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,13 +19,10 @@
package org.apache.pulsar.client.api;
import java.io.Closeable;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
/**
* Class that provides a client interface to Pulsar.
@@ -87,7 +84,6 @@ public interface PulsarClient extends Closeable {
* Producer producer = client.newProducer().topic(myTopic).create();
* </code>
*
- *
* @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance
*
* @since 2.0.0
@@ -124,9 +120,11 @@ public interface PulsarClient extends Closeable {
/**
* Create a consumer with default for subscribing on a specific topic
*
+ * Since 2.2, if you are creating a consumer with non-bytes schema on a non-existence topic, it will
+ * automatically create the topic with the provided schema.
+ *
* @param schema
* provide a way to convert between serialized data and domain objects
- *
* @return a {@link ConsumerBuilder} object to configure and construct the {@link Consumer} instance
*
* @since 2.0.0
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 1e57f73..c31773f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -170,7 +171,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
Consumer<byte[]> consumer = proxyClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = proxyClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
+ Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/my-topic1")
.create();
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 85a1f84..cd38628 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -247,13 +248,13 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
// Step 3: Pass correct client params
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
- proxyClient.newProducer().topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
// Sleep for 4 seconds - wait for proxy auth params to expire
Thread.sleep(4 * 1000);
- proxyClient.newProducer().topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
// Sleep for 3 seconds - wait for client auth parans to expire
Thread.sleep(3 * 1000);
- proxyClient.newProducer().topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
proxyClient.close();
proxyService.close();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 0923ded..3e28242 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
LOG.info("Creating producer 1");
PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
.build();
- Producer<byte[]> producer1 = client1.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+ Producer<byte[]> producer1 = client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
LOG.info("Creating producer 2");
PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
@@ -79,7 +80,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
Producer<byte[]> producer2;
Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
try {
- producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+ producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
producer2.send("Message 1".getBytes());
Assert.fail("Should have failed since max num of connections is 2 and the first producer used them all up - one for discovery and other for producing.");
} catch (Exception ex) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 09912ec..35dfb94 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -73,11 +74,11 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
.connectionsPerBroker(5).ioThreads(5).build();
assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
- Producer<byte[]> producer1 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ Producer<byte[]> producer1 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
.create();
assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
try {
- Producer<byte[]> producer2 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ Producer<byte[]> producer2 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
.create();
Assert.fail("Should have failed since can't acquire LookupRequestSemaphore");
} catch (Exception ex) {
@@ -86,7 +87,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d);
proxyService.getLookupRequestSemaphore().release();
try {
- Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ Producer<byte[]> producer3 = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
.create();
} catch (Exception ex) {
Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 391a5bf..99d79fb 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
@@ -80,7 +81,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
public void testProducer() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
.build();
- Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
.create();
for (int i = 0; i < 10; i++) {
@@ -94,7 +95,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
public void testProducerConsumer() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
.build();
- Producer<byte[]> producer = client.newProducer()
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://sample/test/local/producer-consumer-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -128,7 +129,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
.build();
admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
- Producer<byte[]> producer = client.newProducer()
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://sample/test/local/partitioned-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
@@ -177,7 +178,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
final int numMessages = 20;
- try (Producer<byte[]> producer = client.newProducer()
+ try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://sample/test/local/topic1")
.create()) {
for (int i = 0; i < numMessages; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 116814b..db5070f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
@@ -84,7 +85,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls()).enableTls(true)
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build();
- Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/topic").create();
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/topic").create();
for (int i = 0; i < 10; i++) {
producer.send("test".getBytes());
@@ -101,7 +102,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
admin.tenants().createTenant("sample", new TenantInfo());
admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
- Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/partitioned-topic")
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/partitioned-topic")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
// Create a consumer directly attached to broker
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 9ec8ced..5ee0dac 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -186,13 +187,13 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
}
Producer<byte[]> producer;
try {
- producer = proxyClient.newProducer()
+ producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1").create();
} catch (Exception ex) {
// expected
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
- producer = proxyClient.newProducer()
+ producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1").create();
}
final int msgs = 10;
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 0138de6..d894b08 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -241,7 +242,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = proxyClient.newProducer()
+ Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 050aeec..4909a89 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -164,7 +165,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
Consumer<byte[]> consumer = proxyClient.newConsumer()
.topic("persistent://my-property/without-service-discovery/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer = proxyClient.newProducer()
+ Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/without-service-discovery/my-ns/my-topic1").create();
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index adde9f2..653c076 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -74,12 +74,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-kafka</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<exclusions>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index 0f17aac..3dd8940 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.Test;
+@Test(enabled = false)
@Slf4j
public class KafkaApiTest extends PulsarTestSuite {