You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/23 19:05:45 UTC

[pulsar] branch master updated: Fix TableViewImpl not retrying partition update on exceptions (#14408)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe2f41  Fix TableViewImpl not retrying partition update on exceptions (#14408)
9fe2f41 is described below

commit 9fe2f418200f4231326fc273e5671c5536b9bf65
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Wed Feb 23 14:03:42 2022 -0500

    Fix TableViewImpl not retrying partition update on exceptions (#14408)
    
    * Fix TableViewImpl not retrying partition update on exceptions
    
    * Log exception and undo whitespace changes
    
    * Change topic name in unit test
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 57 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/TableViewImpl.java   |  3 +-
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 9d235ee..4f0b704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -30,12 +30,16 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -158,4 +162,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         });
         Assert.assertEquals(tv.keySet(), keys2);
     }
+
+
+    @Test(timeOut = 30 * 1000)
+    // Regression test for making sure partition changes are always periodically checked even after a check returned
+    // exceptionally.
+    public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception {
+        String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions";
+        admin.topics().createPartitionedTopic(topic, 3);
+        int count = 20;
+        Set<String> keys = this.publishMessages(topic, count, false);
+        PulsarClient spyPulsarClient = Mockito.spy(pulsarClient);
+        @Cleanup
+        TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+        log.info("start tv size: {}", tv.size());
+        tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
+        Awaitility.await().untilAsserted(() -> {
+            log.info("Current tv size: {}", tv.size());
+            Assert.assertEquals(tv.size(), count);
+        });
+        Assert.assertEquals(tv.keySet(), keys);
+        tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
+
+        // Let update partition check throw an exception
+        Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("")))
+                .when(spyPulsarClient)
+                .getPartitionsForTopic(Mockito.any());
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        TopicName topicName = TopicName.get(topic);
+
+        // Make sure the get partitions callback is called; it should throw an exception
+        Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any());
+
+        // Send more data to partition 3, which is not in the current TableView, need update partitions
+        Set<String> keys2 =
+                this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
+
+        // Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions
+        // has been updated
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertEquals(tv.size(), count);
+
+        // Let update partition check succeed, and check the messages eventually arrives
+        Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any());
+        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+            log.info("Current tv size: {}", tv.size());
+            Assert.assertEquals(tv.size(), count * 2);
+        });
+        Assert.assertEquals(tv.keySet(), keys2);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index ab9bf11..e767156 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -105,7 +105,8 @@ public class TableViewImpl<T> implements TableView<T> {
 
         start().whenComplete((tw, ex) -> {
            if (ex != null) {
-               log.warn("Failed to check for changes in number of partitions");
+               log.warn("Failed to check for changes in number of partitions: {}", ex);
+               schedulePartitionsCheck();
            }
         });
     }