You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/09/27 18:32:54 UTC

[pulsar] branch master updated: Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 43ad6f951b6 Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
43ad6f951b6 is described below

commit 43ad6f951b6567dd2c4b015d602fa3316f45a74f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Sep 27 21:32:45 2022 +0300

    Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
    
    * Skip creating a replication snapshot if no messages have been published
    
    * Adapt test to new behavior where replication snapshots happen only when there are new messages
---
 .../ReplicatedSubscriptionsController.java         |  3 +-
 .../broker/service/ReplicatorSubscriptionTest.java | 55 ++++++++++++++++------
 2 files changed, 42 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index a77ac76be8f..1e1245ed36b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -202,7 +202,8 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
-        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
+        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
+                || topic.getLastDataMessagePublishedTimestamp() == 0) {
             // There was no message written since the last snapshot, we can skip creating a new snapshot
             if (log.isDebugEnabled()) {
                 log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 9c426b2a435..ea9a9a2b9f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
@@ -174,6 +175,14 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
         // create subscription in r1
         createReplicatedSubscription(client1, topicName, subscriptionName, true);
 
+        // Validate that no snapshots are created before messages are published
+        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+        PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
+                .getTopic(topicName, false).get().get();
+        ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
+        // no snapshot should have been created before any messages are published
+        assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty());
+
         @Cleanup
         PulsarClient client2 = PulsarClient.builder()
                 .serviceUrl(url2.toString())
@@ -197,9 +206,6 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
         Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
 
         // In R1
-        PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
-                .getTopic(topicName, false).get().get();
-        ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
         Position p1 = t1.getLastPosition();
         String snapshot1 = rsc1.getLastCompletedSnapshotId().get();
 
@@ -541,22 +547,35 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .build();
 
-        // create consumer in r1
-        @Cleanup
-        Consumer<byte[]> consumer1 = client1.newConsumer()
-                .topic(topicName)
-                .subscriptionName(subscriptionName)
-                .replicateSubscriptionState(true)
-                .subscribe();
+        {
+            // create consumer in r1
+            @Cleanup
+            Consumer<byte[]> consumer = client1.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(subscriptionName)
+                    .replicateSubscriptionState(true)
+                    .subscribe();
 
-        // waiting to replicate topic/subscription to r1->r2
-        Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
-        final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
-        Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
-        Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
+            // send one message to trigger replication
+            @Cleanup
+            Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                    .enableBatching(false)
+                    .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                    .create();
+            producer.send("message".getBytes(StandardCharsets.UTF_8));
+
+            assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);
+
+            // waiting to replicate topic/subscription to r1->r2
+            Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
+            final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+            Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
+            Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
+        }
 
         // unsubscribe replicated subscription in r2
         admin2.topics().deleteSubscription(topicName, subscriptionName);
+        final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
         assertNull(topic2.getSubscription(subscriptionName));
 
         // close replicator producer in r2
@@ -581,6 +600,12 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
 
         // consume 6 messages in r1
         Set<String> receivedMessages = new LinkedHashSet<>();
+        @Cleanup
+        Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(true)
+                .subscribe();
         assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);
 
         // wait for subscription to be replicated