You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/12 06:07:53 UTC

[pulsar] branch branch-2.10 updated (6441714fb46 -> f056d9dcdc8)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 6441714fb46 [cherry-pick][branch-2.10] fix code style and run test (#18863)
     new eac65c0f70b Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
     new f056d9dcdc8 [cherry-pick][branch-2.10] fix test after cherry-pick #16618

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ReplicatedSubscriptionsController.java         |  3 +-
 .../broker/service/ReplicatorSubscriptionTest.java | 55 ++++++++++++++++------
 2 files changed, 42 insertions(+), 16 deletions(-)


[pulsar] 02/02: [cherry-pick][branch-2.10] fix test after cherry-pick #16618

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f056d9dcdc8d682d76b8c516a11ed46764a11d16
Author: xiangying <19...@qq.com>
AuthorDate: Mon Dec 12 14:07:34 2022 +0800

    [cherry-pick][branch-2.10] fix test after cherry-pick #16618
---
 .../org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index ea9a9a2b9f2..046adaa5ec2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -181,7 +181,7 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
                 .getTopic(topicName, false).get().get();
         ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
         // no snapshot should have been created before any messages are published
-        assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty());
+        assertFalse(rsc1.getLastCompletedSnapshotId().isPresent());
 
         @Cleanup
         PulsarClient client2 = PulsarClient.builder()


[pulsar] 01/02: Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eac65c0f70b33f3a27a209985c2cde455c73352c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Sep 27 21:32:45 2022 +0300

    Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
    
    * Skip creating a replication snapshot if no messages have been published
    
    * Adapt test to new behavior where replication snapshots happen only when there are new messages
    
    (cherry picked from commit 43ad6f951b6567dd2c4b015d602fa3316f45a74f)
---
 .../ReplicatedSubscriptionsController.java         |  3 +-
 .../broker/service/ReplicatorSubscriptionTest.java | 55 ++++++++++++++++------
 2 files changed, 42 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index a77ac76be8f..1e1245ed36b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -202,7 +202,8 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
-        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
+        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
+                || topic.getLastDataMessagePublishedTimestamp() == 0) {
             // There was no message written since the last snapshot, we can skip creating a new snapshot
             if (log.isDebugEnabled()) {
                 log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 9c426b2a435..ea9a9a2b9f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
@@ -174,6 +175,14 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
         // create subscription in r1
         createReplicatedSubscription(client1, topicName, subscriptionName, true);
 
+        // Validate that no snapshots are created before messages are published
+        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+        PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
+                .getTopic(topicName, false).get().get();
+        ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
+        // no snapshot should have been created before any messages are published
+        assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty());
+
         @Cleanup
         PulsarClient client2 = PulsarClient.builder()
                 .serviceUrl(url2.toString())
@@ -197,9 +206,6 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
         Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
 
         // In R1
-        PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
-                .getTopic(topicName, false).get().get();
-        ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
         Position p1 = t1.getLastPosition();
         String snapshot1 = rsc1.getLastCompletedSnapshotId().get();
 
@@ -541,22 +547,35 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .build();
 
-        // create consumer in r1
-        @Cleanup
-        Consumer<byte[]> consumer1 = client1.newConsumer()
-                .topic(topicName)
-                .subscriptionName(subscriptionName)
-                .replicateSubscriptionState(true)
-                .subscribe();
+        {
+            // create consumer in r1
+            @Cleanup
+            Consumer<byte[]> consumer = client1.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(subscriptionName)
+                    .replicateSubscriptionState(true)
+                    .subscribe();
 
-        // waiting to replicate topic/subscription to r1->r2
-        Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
-        final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
-        Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
-        Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
+            // send one message to trigger replication
+            @Cleanup
+            Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                    .enableBatching(false)
+                    .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                    .create();
+            producer.send("message".getBytes(StandardCharsets.UTF_8));
+
+            assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);
+
+            // waiting to replicate topic/subscription to r1->r2
+            Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
+            final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+            Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
+            Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
+        }
 
         // unsubscribe replicated subscription in r2
         admin2.topics().deleteSubscription(topicName, subscriptionName);
+        final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
         assertNull(topic2.getSubscription(subscriptionName));
 
         // close replicator producer in r2
@@ -581,6 +600,12 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
 
         // consume 6 messages in r1
         Set<String> receivedMessages = new LinkedHashSet<>();
+        @Cleanup
+        Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(true)
+                .subscribe();
         assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);
 
         // wait for subscription to be replicated