You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/20 10:36:46 UTC
[pulsar] branch master updated: AutoConsumeSchema: handle schema
NONE as BYTES (#10277)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bfe5dd7 AutoConsumeSchema: handle schema NONE as BYTES (#10277)
bfe5dd7 is described below
commit bfe5dd7e692217980e0ee38ee7b397ffef485b47
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Apr 20 12:35:37 2021 +0200
AutoConsumeSchema: handle schema NONE as BYTES (#10277)
---
.../java/org/apache/pulsar/schema/SchemaTest.java | 23 ++++++++++++++++++++--
.../client/impl/schema/AutoConsumeSchema.java | 1 +
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 70b6fb7..c094d6a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -258,7 +259,16 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
}
@Test
- public void testUseAutoConsumeWithSchemalessTopic() throws Exception {
+ public void testUseAutoConsumeWithBytesSchemaTopic() throws Exception {
+ testUseAutoConsumeWithSchemalessTopic(SchemaType.BYTES);
+ }
+
+ @Test
+ public void testUseAutoConsumeWithNoneSchemaTopic() throws Exception {
+ testUseAutoConsumeWithSchemalessTopic(SchemaType.NONE);
+ }
+
+ private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-schemaless";
@@ -275,6 +285,15 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(topic, 2);
+ // set schema
+ SchemaInfo schemaInfo = SchemaInfo
+ .builder()
+ .schema(new byte[0])
+ .name("dummySchema")
+ .type(schema)
+ .build();
+ admin.schemas().createSchema(topic, schemaInfo);
+
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
@@ -286,7 +305,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
.subscribe();
// use GenericRecord even for primitive types
- // it will be a PrimitiveRecord
+ // it will be a GenericObjectWrapper
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub3")
.topic(topic)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index ec37f67..62ca026 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -160,6 +160,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
case BOOLEAN:
return BooleanSchema.of();
case BYTES:
+ case NONE:
return BytesSchema.of();
case DATE:
return DateSchema.of();