You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/04/08 01:27:10 UTC

[pulsar] branch master updated: [pulsar-broker] check replicator periodically to avoid issue due to zk missing watch (#6674)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f1673b  [pulsar-broker] check replicator periodically to avoid issue due to zk missing watch (#6674)
0f1673b is described below

commit 0f1673b79fb5b4817af2f0d8d59dfebd59465778
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 7 18:27:00 2020 -0700

    [pulsar-broker] check replicator periodically to avoid issue due to zk missing watch (#6674)
    
    ### Motivation
    We have a regular issue when user changes replication-cluster list, many times broker misses the zk watch and broker fails to update replicator which causes either data loss or message backlog based on zk-watch missing at source or destination replication broker.
    
    ### Modification
    - Broker expires replication policies at every X seconds
    - Starts a background task which checks polices (fetches new policies if it's already expired) and starts/stops replicator if needed.
    
    ### Result
    Broker can start/stop replicators based on updated policies even if broker misses zk watch.
---
 conf/broker.conf                                            |  4 ++++
 conf/standalone.conf                                        |  4 ++++
 .../java/org/apache/pulsar/broker/ServiceConfiguration.java |  6 ++++++
 .../org/apache/pulsar/broker/service/BrokerService.java     | 13 +++++++++++++
 4 files changed, 27 insertions(+)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2ded719..e02e323 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -751,6 +751,10 @@ replicationProducerQueueSize=1000
 # Replicator prefix used for replicator producer name and cursor name
 replicatorPrefix=pulsar.repl
 
+# Duration to check replication policy to avoid replicator inconsistency 
+# due to missing ZooKeeper watch (disable with value 0)
+replicatioPolicyCheckDurationSeconds=600
+
 # Default message retention time
 defaultRetentionTimeInMinutes=0
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 9b3b615..4095d88 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -502,6 +502,10 @@ replicationConnectionsPerBroker=16
 # Replicator producer queue size
 replicationProducerQueueSize=1000
 
+# Duration to check replication policy to avoid replicator inconsistency 
+# due to missing ZooKeeper watch (disable with value 0)
+replicatioPolicyCheckDurationSeconds=600
+
 # Default message retention time
 defaultRetentionTimeInMinutes=0
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 239442e..3cc068f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1281,6 +1281,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "Replicator producer queue size"
     )
     private int replicationProducerQueueSize = 1000;
+    @FieldContext(
+            category = CATEGORY_REPLICATION,
+            doc = "Duration to check replication policy to avoid replicator "
+                    + "inconsistency due to missing ZooKeeper watch (disable with value 0)"
+        )
+    private int replicatioPolicyCheckDurationSeconds = 600;
     @Deprecated
     @FieldContext(
         category = CATEGORY_REPLICATION,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e065bbd..a28f3a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -404,6 +404,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         this.startMessagePublishBufferMonitor();
         this.startBacklogQuotaChecker();
         this.updateBrokerPublisherThrottlingMaxRate();
+        this.startCheckReplicationPolicies();
         // register listener to capture zk-latency
         ClientCnxnAspect.addListener(zkStatsListener);
         ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
@@ -446,6 +447,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 TimeUnit.MINUTES);
     }
 
+    protected void startCheckReplicationPolicies() {
+        int interval = pulsar.getConfig().getReplicatioPolicyCheckDurationSeconds();
+        if (interval > 0) {
+            messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies), interval, interval,
+                    TimeUnit.SECONDS);
+        }
+    }
+
     protected void startCompactionMonitor() {
         int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
         if (interval > 0) {
@@ -1143,6 +1152,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         forEachTopic(Topic::checkMessageExpiry);
     }
 
+    public void checkReplicationPolicies() {
+        forEachTopic(Topic::checkReplication);
+    }
+
     public void checkCompaction() {
         forEachTopic((t) -> {
                 if (t instanceof PersistentTopic) {