You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:21 UTC

[pulsar] 27/46: GenericObject: handle KeyValue with SEPARATED encoding and add more tests (#10186)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72295b67beee0ba31297518f342ba80dfa197a61
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Apr 12 11:12:14 2021 +0200

    GenericObject: handle KeyValue with SEPARATED encoding and add more tests (#10186)
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 96 ++++++++++++++++++++--
 .../org/apache/pulsar/client/impl/MessageImpl.java | 27 ++++--
 .../client/impl/schema/AutoConsumeSchema.java      | 10 ++-
 3 files changed, 116 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 56ca04c..7672cce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -25,6 +25,8 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
 import com.google.common.collect.Sets;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -307,10 +309,19 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testKeyValueSchema() throws Exception {
+    public void testKeyValueSchemaINLINE() throws Exception {
+        testKeyValueSchema(KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testKeyValueSchemaSEPARATED() throws Exception {
+        testKeyValueSchema(KeyValueEncodingType.SEPARATED);
+    }
+
+    private void testKeyValueSchema(KeyValueEncodingType keyValueEncodingType) throws Exception {
         final String tenant = PUBLIC_TENANT;
         final String namespace = "test-namespace-" + randomName(16);
-        final String topicName = "test-string-schema";
+        final String topicName = "test-kv-schema-" + randomName(16);
 
         final String topic = TopicName.get(
                 TopicDomain.persistent.value(),
@@ -325,18 +336,16 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic(topic, 2);
 
         Producer<KeyValue<String, Integer>> producer = pulsarClient
-                .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
+                .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, keyValueEncodingType))
                 .topic(topic)
                 .create();
 
-        producer.send(new KeyValue<>("foo", 123));
-
-        Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
+        Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, keyValueEncodingType))
                 .subscriptionName("test-sub")
                 .topic(topic)
                 .subscribe();
 
-        Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+        Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) // keyValueEncodingType autodetected
                 .subscriptionName("test-sub2")
                 .topic(topic)
                 .subscribe();
@@ -347,6 +356,79 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         Message<GenericRecord> message2 = consumer2.receive();
         assertEquals(message.getValue(), message2.getValue().getNativeObject());
 
+        if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+            // with "SEPARATED encoding the routing key is the key of the KeyValue
+            assertArrayEquals("foo".getBytes(StandardCharsets.UTF_8), message.getKeyBytes());
+            assertArrayEquals("foo".getBytes(StandardCharsets.UTF_8), message2.getKeyBytes());
+        } else {
+            assertNull(message.getKey());
+            assertNull(message2.getKey());
+        }
+
+        producer.close();
+        consumer.close();
+        consumer2.close();
+    }
+
+    @Test
+    public void testKeyValueSchemaWithStructsINLINE() throws Exception {
+        testKeyValueSchema(KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testKeyValueSchemaWithStructsSEPARATED() throws Exception {
+        testKeyValueSchema(KeyValueEncodingType.SEPARATED);
+    }
+
+    private void testKeyValueSchemaWithStructs(KeyValueEncodingType keyValueEncodingType) throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicName = "test-kv-schema-" + randomName(16);
+
+        final String topic = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicName).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME));
+
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        Producer<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> producer = pulsarClient
+                .newProducer(Schema.KeyValue(Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonTwo.class), keyValueEncodingType))
+                .topic(topic)
+                .create();
+
+        Consumer<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonTwo.class), keyValueEncodingType))
+                .subscriptionName("test-sub")
+                .topic(topic)
+                .subscribe();
+
+        Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) // keyValueEncodingType autodetected
+                .subscriptionName("test-sub2")
+                .topic(topic)
+                .subscribe();
+
+        Schemas.PersonOne key = new Schemas.PersonOne(8787);
+        Schemas.PersonTwo value = new Schemas.PersonTwo(323, "foo");
+        producer.send(new KeyValue<>(key, value));
+
+        Message<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> message = consumer.receive();
+        Message<GenericRecord> message2 = consumer2.receive();
+        assertEquals(message.getValue(), message2.getValue().getNativeObject());
+
+        if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+            // with "SEPARATED encoding the routing key is the key of the KeyValue
+            assertNotNull(message.getKeyBytes());
+            assertNotNull(message2.getKeyBytes());
+        } else {
+            assertNull(message.getKey());
+            assertNull(message2.getKey());
+        }
+
         producer.close();
         consumer.close();
         consumer2.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f7d8cf9..3cb2235 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -319,9 +319,14 @@ public class MessageImpl<T> implements Message<T> {
         KeyValueSchema kvSchema = getKeyValueSchema();
         byte[] schemaVersion = getSchemaVersion();
         if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
-            return (T) kvSchema.decode(
-                    msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
-                    msgMetadataBuilder.hasNullValue() ? null : getData(), schemaVersion);
+            org.apache.pulsar.common.schema.KeyValue keyValue =
+                    (org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+            if (schema instanceof AutoConsumeSchema) {
+                return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
+                        schema.getSchemaInfo().getType(), schemaVersion);
+            } else {
+                return (T) keyValue;
+            }
         } else {
             return schema.decode(getData(), schemaVersion);
         }
@@ -330,9 +335,14 @@ public class MessageImpl<T> implements Message<T> {
     private T getKeyValue() {
         KeyValueSchema kvSchema = getKeyValueSchema();
         if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
-            return (T) kvSchema.decode(
-                    msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
-                    msgMetadataBuilder.hasNullValue() ? null : getData(), null);
+            org.apache.pulsar.common.schema.KeyValue keyValue =
+                    (org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), null);
+            if (schema instanceof AutoConsumeSchema) {
+                return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
+                        schema.getSchemaInfo().getType(), null);
+            } else {
+                return (T) keyValue;
+            }
         } else {
             return schema.decode(getData());
         }
@@ -420,8 +430,9 @@ public class MessageImpl<T> implements Message<T> {
 
     @Override
     public byte[] getKeyBytes() {
-        checkNotNull(msgMetadataBuilder);
-        if (hasBase64EncodedKey()) {
+        if (!msgMetadata.hasPartitionKey() || msgMetadata.isNullPartitionKey()) {
+            return null;
+        } else if (hasBase64EncodedKey()) {
             return Base64.getDecoder().decode(getKey());
         } else {
             return getKey().getBytes(UTF_8);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 647a87b..41b1260 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -21,12 +21,14 @@ package org.apache.pulsar.client.impl.schema;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 import java.util.concurrent.ExecutionException;
 
@@ -247,10 +249,14 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
         if (this.schema == null) {
             throw new IllegalStateException("Cannot decode a message without schema");
         }
-        return GenericObjectWrapper.of(value,
-                this.schema.getSchemaInfo().getType(), schemaVersion);
+        return wrapPrimitiveObject(value, schema.getSchemaInfo().getType(), schemaVersion);
     }
 
+    public static GenericRecord wrapPrimitiveObject(Object value, SchemaType type, byte[] schemaVersion) {
+        return GenericObjectWrapper.of(value, type, schemaVersion);
+    }
+
+
     public Schema<?> getInternalSchema() {
         return schema;
     }