You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/06 16:46:22 UTC
[pulsar] branch master updated: Fix NPE when TableView handles null value message (#15951)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 722c56dfcad Fix NPE when TableView handles null value message (#15951)
722c56dfcad is described below
commit 722c56dfcad97b46f8483b1729821a794f9e7426
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Tue Jun 7 00:46:14 2022 +0800
Fix NPE when TableView handles null value message (#15951)
---
.../apache/pulsar/client/impl/TableViewTest.java | 57 ++++++++++++++++++----
.../apache/pulsar/client/impl/TableViewImpl.java | 6 ++-
2 files changed, 53 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index dc039f225b9..20f510e97e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.client.impl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.HashSet;
@@ -40,7 +43,6 @@ import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.fail;
/**
* Unit test for {@link org.apache.pulsar.client.impl.TableViewImpl}.
@@ -113,18 +115,18 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
- Assert.assertEquals(tv.size(), count);
+ assertEquals(tv.size(), count);
});
- Assert.assertEquals(tv.keySet(), keys);
+ assertEquals(tv.keySet(), keys);
tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
// Send more data
Set<String> keys2 = this.publishMessages(topic, count * 2, false);
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
- Assert.assertEquals(tv.size(), count * 2);
+ assertEquals(tv.size(), count * 2);
});
- Assert.assertEquals(tv.keySet(), keys2);
+ assertEquals(tv.keySet(), keys2);
// Test collection
try {
tv.keySet().clear();
@@ -161,9 +163,9 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
- Assert.assertEquals(tv.size(), count);
+ assertEquals(tv.size(), count);
});
- Assert.assertEquals(tv.keySet(), keys);
+ assertEquals(tv.keySet(), keys);
tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
admin.topics().updatePartitionedTopic(topic, 4);
@@ -174,8 +176,45 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
- Assert.assertEquals(tv.size(), count * 2);
+ assertEquals(tv.size(), count * 2);
});
- Assert.assertEquals(tv.keySet(), keys2);
+ assertEquals(tv.keySet(), keys2);
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testPublishNullValue() throws Exception {
+ String topic = "persistent://public/default/tableview-test-publish-null-value";
+ admin.topics().createPartitionedTopic(topic, 3);
+
+ final TableView<String> tv = pulsarClient.newTableViewBuilder(Schema.STRING)
+ .topic(topic)
+ .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ .create();
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ producer.newMessage().key("key1").value("value1").send();
+
+ Awaitility.await().untilAsserted(() -> assertEquals(tv.get("key1"), "value1"));
+ assertEquals(tv.size(), 1);
+
+ // Try to remove key1 by publishing the tombstones message.
+ producer.newMessage().key("key1").value(null).send();
+ Awaitility.await().untilAsserted(() -> assertEquals(tv.size(), 0));
+
+ producer.newMessage().key("key2").value("value2").send();
+ Awaitility.await().untilAsserted(() -> assertEquals(tv.get("key2"), "value2"));
+ assertEquals(tv.size(), 1);
+
+ tv.close();
+
+ @Cleanup
+ TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+ .topic(topic)
+ .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ .create();
+
+ assertEquals(tv1.size(), 1);
+ assertEquals(tv.get("key2"), "value2");
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 02019ae0cfa..652303ddf71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -155,7 +155,11 @@ public class TableViewImpl<T> implements TableView<T> {
try {
listenersMutex.lock();
- data.put(msg.getKey(), msg.getValue());
+ if (null == msg.getValue()){
+ data.remove(msg.getKey());
+ } else {
+ data.put(msg.getKey(), msg.getValue());
+ }
for (BiConsumer<String, T> listener : listeners) {
try {