You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/23 19:20:26 UTC
[pulsar] 01/02: Fix TableViewImpl not retrying partition update on exceptions (#14408)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b90c87b72d5ec9a6d3f05ead716dc42b804f799b
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Wed Feb 23 14:03:42 2022 -0500
Fix TableViewImpl not retrying partition update on exceptions (#14408)
* Fix TableViewImpl not retrying partition update on exceptions
* Log exception and undo whitespace changes
* Change topic name in unit test
---
.../apache/pulsar/client/impl/TableViewTest.java | 57 ++++++++++++++++++++++
.../apache/pulsar/client/impl/TableViewImpl.java | 3 +-
2 files changed, 59 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 9d235ee..4f0b704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -30,12 +30,16 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -158,4 +162,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
});
Assert.assertEquals(tv.keySet(), keys2);
}
+
+
+ @Test(timeOut = 30 * 1000)
+ // Regression test for making sure partition changes are always periodically checked even after a check returned
+ // exceptionally.
+ public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception {
+ String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions";
+ admin.topics().createPartitionedTopic(topic, 3);
+ int count = 20;
+ Set<String> keys = this.publishMessages(topic, count, false);
+ PulsarClient spyPulsarClient = Mockito.spy(pulsarClient);
+ @Cleanup
+ TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES)
+ .topic(topic)
+ .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ .create();
+ log.info("start tv size: {}", tv.size());
+ tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
+ Awaitility.await().untilAsserted(() -> {
+ log.info("Current tv size: {}", tv.size());
+ Assert.assertEquals(tv.size(), count);
+ });
+ Assert.assertEquals(tv.keySet(), keys);
+ tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
+
+ // Let update partition check throw an exception
+ Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("")))
+ .when(spyPulsarClient)
+ .getPartitionsForTopic(Mockito.any());
+
+ admin.topics().updatePartitionedTopic(topic, 4);
+ TopicName topicName = TopicName.get(topic);
+
+ // Make sure the get partitions callback is called; it should throw an exception
+ Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any());
+
+ // Send more data to partition 3, which is not in the current TableView, need update partitions
+ Set<String> keys2 =
+ this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
+
+ // Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions
+ // has been updated
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertEquals(tv.size(), count);
+
+ // Let update partition check succeed, and check the messages eventually arrives
+ Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any());
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+ log.info("Current tv size: {}", tv.size());
+ Assert.assertEquals(tv.size(), count * 2);
+ });
+ Assert.assertEquals(tv.keySet(), keys2);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index ab9bf11..e767156 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -105,7 +105,8 @@ public class TableViewImpl<T> implements TableView<T> {
start().whenComplete((tw, ex) -> {
if (ex != null) {
- log.warn("Failed to check for changes in number of partitions");
+ log.warn("Failed to check for changes in number of partitions: {}", ex);
+ schedulePartitionsCheck();
}
});
}