You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/17 19:03:48 UTC

[pulsar] branch master updated: GenericObject: add more test cases about KeyValue and NULL values (#10609)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e552d68  GenericObject: add more test cases about KeyValue and NULL values (#10609)
e552d68 is described below

commit e552d6893ab880d10b350aa7e3ec89e4d12b3640
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon May 17 21:02:49 2021 +0200

    GenericObject: add more test cases about KeyValue and NULL values (#10609)
    
    Co-authored-by: Enrico Olivelli <eo...@apache.org>
---
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 63 ++++++++++++++++++++--
 1 file changed, 59 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index fe0dc1b..16a16c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -28,6 +28,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -267,7 +268,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             Message<V2Data> msg1 = c.receive();
             V2Data msg1Value = msg1.getValue();
             Assert.assertEquals(dataV1.i, msg1Value.i);
-            Assert.assertNull(msg1Value.j);
+            assertNull(msg1Value.j);
             Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
 
             Message<V2Data> msg2 = c.receive();
@@ -310,7 +311,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             Message<V2Data> msg1 = c.receive();
             V2Data msg1Value = msg1.getValue();
             Assert.assertEquals(data1.i, msg1Value.i);
-            Assert.assertNull(msg1Value.j);
+            assertNull(msg1Value.j);
             Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
 
             Message<V2Data> msg2 = c.receive();
@@ -320,7 +321,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             Message<V2Data> msg3 = c.receive();
             V2Data msg3Value = msg3.getValue();
             Assert.assertEquals(data3.i, msg3Value.i);
-            Assert.assertNull(msg3Value.j);
+            assertNull(msg3Value.j);
             Assert.assertEquals(msg3.getSchemaVersion(), new LongSchemaVersion(0).bytes());
         }
     }
@@ -405,7 +406,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         for (int i = 0; i < total; ++i) {
             V2Data value = c.receive().getValue();
             if (i / batch % 2 == 0) {
-                Assert.assertNull(value.j);
+                assertNull(value.j);
                 Assert.assertEquals(value.i, i);
             } else {
                 Assert.assertEquals(value, new V2Data(i, i + total));
@@ -955,4 +956,58 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         // topic2's schema becomes STRING now.
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
+
+    @DataProvider(name = "keyEncodingType")
+    public static Object[] keyEncodingType() {
+        return new Object[] { KeyValueEncodingType.SEPARATED, KeyValueEncodingType.INLINE };
+    }
+
+    @Test(dataProvider = "keyEncodingType")
+    public void testAutoKeyValueConsumeGenericObjectNullValues(KeyValueEncodingType encodingType) throws Exception {
+        String topic = "my-property/my-ns/schema-test-auto-keyvalue-" + encodingType + "-null-value-consume-" + UUID.randomUUID();
+
+        Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
+                Schema.AVRO(V1Data.class),
+                Schema.AVRO(V1Data.class),
+                encodingType);
+
+        try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
+                .topic(topic)
+                .create();
+             Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                     .topic(topic)
+                     .subscriptionName("sub0")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe();
+        ) {
+
+            p.send(new KeyValue<>(new V1Data(1), new V1Data(2)));
+            p.send(new KeyValue<>(new V1Data(1), null));
+            p.send(new KeyValue<>(null, new V1Data(2)));
+            p.send(new KeyValue<>(null, null));
+
+            Message<GenericRecord> wrapper = c0.receive();
+            assertEquals(encodingType, ((KeyValueSchema) wrapper.getReaderSchema().get()).getKeyValueEncodingType());
+            KeyValue<GenericRecord, GenericRecord> data1 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+            assertEquals(1, data1.getKey().getField("i"));
+            assertEquals(2, data1.getValue().getField("i"));
+
+            wrapper = c0.receive();
+            KeyValue<GenericRecord, GenericRecord> data2 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+            assertEquals(1, data2.getKey().getField("i"));
+            assertNull(data2.getValue());
+
+            wrapper = c0.receive();
+            KeyValue<GenericRecord, GenericRecord> data3 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+            assertNull(data3.getKey());
+            assertEquals(2, data3.getValue().getField("i"));
+
+            wrapper = c0.receive();
+            KeyValue<GenericRecord, GenericRecord> data4 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+            assertNull(data4.getKey());
+            assertNull(data4.getValue());
+
+
+        }
+    }
 }