You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/06 16:46:22 UTC

[pulsar] branch master updated: Fix NPE when TableView handles null value message (#15951)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 722c56dfcad Fix NPE when TableView handles null value message (#15951)
722c56dfcad is described below

commit 722c56dfcad97b46f8483b1729821a794f9e7426
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Tue Jun 7 00:46:14 2022 +0800

    Fix NPE when TableView handles null value message (#15951)
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 57 ++++++++++++++++++----
 .../apache/pulsar/client/impl/TableViewImpl.java   |  6 ++-
 2 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index dc039f225b9..20f510e97e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
 import com.google.common.collect.Sets;
 import java.time.Duration;
 import java.util.HashSet;
@@ -40,7 +43,6 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.testng.Assert.fail;
 
 /**
  * Unit test for {@link org.apache.pulsar.client.impl.TableViewImpl}.
@@ -113,18 +115,18 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
         Awaitility.await().untilAsserted(() -> {
             log.info("Current tv size: {}", tv.size());
-            Assert.assertEquals(tv.size(), count);
+            assertEquals(tv.size(), count);
         });
-        Assert.assertEquals(tv.keySet(), keys);
+        assertEquals(tv.keySet(), keys);
         tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
 
         // Send more data
         Set<String> keys2 = this.publishMessages(topic, count * 2, false);
         Awaitility.await().untilAsserted(() -> {
             log.info("Current tv size: {}", tv.size());
-            Assert.assertEquals(tv.size(), count * 2);
+            assertEquals(tv.size(), count * 2);
         });
-        Assert.assertEquals(tv.keySet(), keys2);
+        assertEquals(tv.keySet(), keys2);
         // Test collection
         try {
             tv.keySet().clear();
@@ -161,9 +163,9 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
         Awaitility.await().untilAsserted(() -> {
             log.info("Current tv size: {}", tv.size());
-            Assert.assertEquals(tv.size(), count);
+            assertEquals(tv.size(), count);
         });
-        Assert.assertEquals(tv.keySet(), keys);
+        assertEquals(tv.keySet(), keys);
         tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
 
         admin.topics().updatePartitionedTopic(topic, 4);
@@ -174,8 +176,45 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
                 this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
         Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
             log.info("Current tv size: {}", tv.size());
-            Assert.assertEquals(tv.size(), count * 2);
+            assertEquals(tv.size(), count * 2);
         });
-        Assert.assertEquals(tv.keySet(), keys2);
+        assertEquals(tv.keySet(), keys2);
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testPublishNullValue() throws Exception {
+        String topic = "persistent://public/default/tableview-test-publish-null-value";
+        admin.topics().createPartitionedTopic(topic, 3);
+
+        final TableView<String> tv = pulsarClient.newTableViewBuilder(Schema.STRING)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        producer.newMessage().key("key1").value("value1").send();
+
+        Awaitility.await().untilAsserted(() -> assertEquals(tv.get("key1"), "value1"));
+        assertEquals(tv.size(), 1);
+
+        // Try to remove key1 by publishing the tombstones message.
+        producer.newMessage().key("key1").value(null).send();
+        Awaitility.await().untilAsserted(() -> assertEquals(tv.size(), 0));
+
+        producer.newMessage().key("key2").value("value2").send();
+        Awaitility.await().untilAsserted(() -> assertEquals(tv.get("key2"), "value2"));
+        assertEquals(tv.size(), 1);
+
+        tv.close();
+
+        @Cleanup
+        TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(tv1.size(), 1);
+        assertEquals(tv.get("key2"), "value2");
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 02019ae0cfa..652303ddf71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -155,7 +155,11 @@ public class TableViewImpl<T> implements TableView<T> {
 
                 try {
                     listenersMutex.lock();
-                    data.put(msg.getKey(), msg.getValue());
+                    if (null == msg.getValue()){
+                        data.remove(msg.getKey());
+                    } else {
+                        data.put(msg.getKey(), msg.getValue());
+                    }
 
                     for (BiConsumer<String, T> listener : listeners) {
                         try {