You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/09/27 18:32:54 UTC
[pulsar] branch master updated: Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43ad6f951b6 Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
43ad6f951b6 is described below
commit 43ad6f951b6567dd2c4b015d602fa3316f45a74f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Sep 27 21:32:45 2022 +0300
Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618)
* Skip creating a replication snapshot if no messages have been published
* Adapt test to new behavior where replication snapshots happen only when there are new messages
---
.../ReplicatedSubscriptionsController.java | 3 +-
.../broker/service/ReplicatorSubscriptionTest.java | 55 ++++++++++++++++------
2 files changed, 42 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index a77ac76be8f..1e1245ed36b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -202,7 +202,8 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
private void startNewSnapshot() {
cleanupTimedOutSnapshots();
- if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
+ if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
+ || topic.getLastDataMessagePublishedTimestamp() == 0) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 9c426b2a435..ea9a9a2b9f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -174,6 +175,14 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, true);
+ // Validate that no snapshots are created before messages are published
+ Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+ PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
+ .getTopic(topicName, false).get().get();
+ ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
+ // no snapshot should have been created before any messages are published
+ assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty());
+
@Cleanup
PulsarClient client2 = PulsarClient.builder()
.serviceUrl(url2.toString())
@@ -197,9 +206,6 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
// In R1
- PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
- .getTopic(topicName, false).get().get();
- ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
Position p1 = t1.getLastPosition();
String snapshot1 = rsc1.getLastCompletedSnapshotId().get();
@@ -541,22 +547,35 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
.statsInterval(0, TimeUnit.SECONDS)
.build();
- // create consumer in r1
- @Cleanup
- Consumer<byte[]> consumer1 = client1.newConsumer()
- .topic(topicName)
- .subscriptionName(subscriptionName)
- .replicateSubscriptionState(true)
- .subscribe();
+ {
+ // create consumer in r1
+ @Cleanup
+ Consumer<byte[]> consumer = client1.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(true)
+ .subscribe();
- // waiting to replicate topic/subscription to r1->r2
- Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
- final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
- Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
- Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
+ // send one message to trigger replication
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ producer.send("message".getBytes(StandardCharsets.UTF_8));
+
+ assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);
+
+ // waiting to replicate topic/subscription to r1->r2
+ Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
+ final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
+ Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
+ }
// unsubscribe replicated subscription in r2
admin2.topics().deleteSubscription(topicName, subscriptionName);
+ final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertNull(topic2.getSubscription(subscriptionName));
// close replicator producer in r2
@@ -581,6 +600,12 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
// consume 6 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
+ @Cleanup
+ Consumer<byte[]> consumer1 = client1.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(true)
+ .subscribe();
assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);
// wait for subscription to be replicated