You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/17 19:03:48 UTC
[pulsar] branch master updated: GenericObject: add more test cases
about KeyValue and NULL values (#10609)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e552d68 GenericObject: add more test cases about KeyValue and NULL values (#10609)
e552d68 is described below
commit e552d6893ab880d10b350aa7e3ec89e4d12b3640
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon May 17 21:02:49 2021 +0200
GenericObject: add more test cases about KeyValue and NULL values (#10609)
Co-authored-by: Enrico Olivelli <eo...@apache.org>
---
.../apache/pulsar/client/api/SimpleSchemaTest.java | 63 ++++++++++++++++++++--
1 file changed, 59 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index fe0dc1b..16a16c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -28,6 +28,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -267,7 +268,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Assert.assertEquals(dataV1.i, msg1Value.i);
- Assert.assertNull(msg1Value.j);
+ assertNull(msg1Value.j);
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Message<V2Data> msg2 = c.receive();
@@ -310,7 +311,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Assert.assertEquals(data1.i, msg1Value.i);
- Assert.assertNull(msg1Value.j);
+ assertNull(msg1Value.j);
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Message<V2Data> msg2 = c.receive();
@@ -320,7 +321,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
Message<V2Data> msg3 = c.receive();
V2Data msg3Value = msg3.getValue();
Assert.assertEquals(data3.i, msg3Value.i);
- Assert.assertNull(msg3Value.j);
+ assertNull(msg3Value.j);
Assert.assertEquals(msg3.getSchemaVersion(), new LongSchemaVersion(0).bytes());
}
}
@@ -405,7 +406,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
for (int i = 0; i < total; ++i) {
V2Data value = c.receive().getValue();
if (i / batch % 2 == 0) {
- Assert.assertNull(value.j);
+ assertNull(value.j);
Assert.assertEquals(value.i, i);
} else {
Assert.assertEquals(value, new V2Data(i, i + total));
@@ -955,4 +956,58 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
// topic2's schema becomes STRING now.
Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
}
+
+ @DataProvider(name = "keyEncodingType")
+ public static Object[] keyEncodingType() {
+ return new Object[] { KeyValueEncodingType.SEPARATED, KeyValueEncodingType.INLINE };
+ }
+
+ @Test(dataProvider = "keyEncodingType")
+ public void testAutoKeyValueConsumeGenericObjectNullValues(KeyValueEncodingType encodingType) throws Exception {
+ String topic = "my-property/my-ns/schema-test-auto-keyvalue-" + encodingType + "-null-value-consume-" + UUID.randomUUID();
+
+ Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ encodingType);
+
+ try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
+ .topic(topic)
+ .create();
+ Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ) {
+
+ p.send(new KeyValue<>(new V1Data(1), new V1Data(2)));
+ p.send(new KeyValue<>(new V1Data(1), null));
+ p.send(new KeyValue<>(null, new V1Data(2)));
+ p.send(new KeyValue<>(null, null));
+
+ Message<GenericRecord> wrapper = c0.receive();
+ assertEquals(encodingType, ((KeyValueSchema) wrapper.getReaderSchema().get()).getKeyValueEncodingType());
+ KeyValue<GenericRecord, GenericRecord> data1 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertEquals(1, data1.getKey().getField("i"));
+ assertEquals(2, data1.getValue().getField("i"));
+
+ wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data2 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertEquals(1, data2.getKey().getField("i"));
+ assertNull(data2.getValue());
+
+ wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data3 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNull(data3.getKey());
+ assertEquals(2, data3.getValue().getField("i"));
+
+ wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data4 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNull(data4.getKey());
+ assertNull(data4.getValue());
+
+
+ }
+ }
}