You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:21 UTC
[pulsar] 27/46: GenericObject: handle KeyValue with SEPARATED
encoding and add more tests (#10186)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 72295b67beee0ba31297518f342ba80dfa197a61
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Apr 12 11:12:14 2021 +0200
GenericObject: handle KeyValue with SEPARATED encoding and add more tests (#10186)
---
.../java/org/apache/pulsar/schema/SchemaTest.java | 96 ++++++++++++++++++++--
.../org/apache/pulsar/client/impl/MessageImpl.java | 27 ++++--
.../client/impl/schema/AutoConsumeSchema.java | 10 ++-
3 files changed, 116 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 56ca04c..7672cce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -25,6 +25,8 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -307,10 +309,19 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
}
@Test
- public void testKeyValueSchema() throws Exception {
+ public void testKeyValueSchemaINLINE() throws Exception {
+ testKeyValueSchema(KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testKeyValueSchemaSEPARATED() throws Exception {
+ testKeyValueSchema(KeyValueEncodingType.SEPARATED);
+ }
+
+ private void testKeyValueSchema(KeyValueEncodingType keyValueEncodingType) throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
- final String topicName = "test-string-schema";
+ final String topicName = "test-kv-schema-" + randomName(16);
final String topic = TopicName.get(
TopicDomain.persistent.value(),
@@ -325,18 +336,16 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(topic, 2);
Producer<KeyValue<String, Integer>> producer = pulsarClient
- .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
+ .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, keyValueEncodingType))
.topic(topic)
.create();
- producer.send(new KeyValue<>("foo", 123));
-
- Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
+ Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, keyValueEncodingType))
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
- Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) // keyValueEncodingType autodetected
.subscriptionName("test-sub2")
.topic(topic)
.subscribe();
@@ -347,6 +356,79 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(message.getValue(), message2.getValue().getNativeObject());
+ if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+ // with "SEPARATED encoding the routing key is the key of the KeyValue
+ assertArrayEquals("foo".getBytes(StandardCharsets.UTF_8), message.getKeyBytes());
+ assertArrayEquals("foo".getBytes(StandardCharsets.UTF_8), message2.getKeyBytes());
+ } else {
+ assertNull(message.getKey());
+ assertNull(message2.getKey());
+ }
+
+ producer.close();
+ consumer.close();
+ consumer2.close();
+ }
+
+ @Test
+ public void testKeyValueSchemaWithStructsINLINE() throws Exception {
+ testKeyValueSchema(KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testKeyValueSchemaWithStructsSEPARATED() throws Exception {
+ testKeyValueSchema(KeyValueEncodingType.SEPARATED);
+ }
+
+ private void testKeyValueSchemaWithStructs(KeyValueEncodingType keyValueEncodingType) throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicName = "test-kv-schema-" + randomName(16);
+
+ final String topic = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topicName).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME));
+
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ Producer<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> producer = pulsarClient
+ .newProducer(Schema.KeyValue(Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonTwo.class), keyValueEncodingType))
+ .topic(topic)
+ .create();
+
+ Consumer<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonTwo.class), keyValueEncodingType))
+ .subscriptionName("test-sub")
+ .topic(topic)
+ .subscribe();
+
+ Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) // keyValueEncodingType autodetected
+ .subscriptionName("test-sub2")
+ .topic(topic)
+ .subscribe();
+
+ Schemas.PersonOne key = new Schemas.PersonOne(8787);
+ Schemas.PersonTwo value = new Schemas.PersonTwo(323, "foo");
+ producer.send(new KeyValue<>(key, value));
+
+ Message<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> message = consumer.receive();
+ Message<GenericRecord> message2 = consumer2.receive();
+ assertEquals(message.getValue(), message2.getValue().getNativeObject());
+
+ if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+ // with "SEPARATED encoding the routing key is the key of the KeyValue
+ assertNotNull(message.getKeyBytes());
+ assertNotNull(message2.getKeyBytes());
+ } else {
+ assertNull(message.getKey());
+ assertNull(message2.getKey());
+ }
+
producer.close();
consumer.close();
consumer2.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f7d8cf9..3cb2235 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -319,9 +319,14 @@ public class MessageImpl<T> implements Message<T> {
KeyValueSchema kvSchema = getKeyValueSchema();
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
- return (T) kvSchema.decode(
- msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
- msgMetadataBuilder.hasNullValue() ? null : getData(), schemaVersion);
+ org.apache.pulsar.common.schema.KeyValue keyValue =
+ (org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+ if (schema instanceof AutoConsumeSchema) {
+ return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
+ schema.getSchemaInfo().getType(), schemaVersion);
+ } else {
+ return (T) keyValue;
+ }
} else {
return schema.decode(getData(), schemaVersion);
}
@@ -330,9 +335,14 @@ public class MessageImpl<T> implements Message<T> {
private T getKeyValue() {
KeyValueSchema kvSchema = getKeyValueSchema();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
- return (T) kvSchema.decode(
- msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
- msgMetadataBuilder.hasNullValue() ? null : getData(), null);
+ org.apache.pulsar.common.schema.KeyValue keyValue =
+ (org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), null);
+ if (schema instanceof AutoConsumeSchema) {
+ return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
+ schema.getSchemaInfo().getType(), null);
+ } else {
+ return (T) keyValue;
+ }
} else {
return schema.decode(getData());
}
@@ -420,8 +430,9 @@ public class MessageImpl<T> implements Message<T> {
@Override
public byte[] getKeyBytes() {
- checkNotNull(msgMetadataBuilder);
- if (hasBase64EncodedKey()) {
+ if (!msgMetadata.hasPartitionKey() || msgMetadata.isNullPartitionKey()) {
+ return null;
+ } else if (hasBase64EncodedKey()) {
return Base64.getDecoder().decode(getKey());
} else {
return getKey().getBytes(UTF_8);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 647a87b..41b1260 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -21,12 +21,14 @@ package org.apache.pulsar.client.impl.schema;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.ExecutionException;
@@ -247,10 +249,14 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
if (this.schema == null) {
throw new IllegalStateException("Cannot decode a message without schema");
}
- return GenericObjectWrapper.of(value,
- this.schema.getSchemaInfo().getType(), schemaVersion);
+ return wrapPrimitiveObject(value, schema.getSchemaInfo().getType(), schemaVersion);
}
+ public static GenericRecord wrapPrimitiveObject(Object value, SchemaType type, byte[] schemaVersion) {
+ return GenericObjectWrapper.of(value, type, schemaVersion);
+ }
+
+
public Schema<?> getInternalSchema() {
return schema;
}