You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/04/08 01:27:10 UTC
[pulsar] branch master updated: [pulsar-broker] check replicator
periodically to avoid issue due to zk missing watch (#6674)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0f1673b [pulsar-broker] check replicator periodically to avoid issue due to zk missing watch (#6674)
0f1673b is described below
commit 0f1673b79fb5b4817af2f0d8d59dfebd59465778
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 7 18:27:00 2020 -0700
[pulsar-broker] check replicator periodically to avoid issue due to zk missing watch (#6674)
### Motivation
We have a regular issue when user changes replication-cluster list, many times broker misses the zk watch and broker fails to update replicator which causes either data loss or message backlog based on zk-watch missing at source or destination replication broker.
### Modification
- Broker expires replication policies at every X seconds
- Starts a background task which checks polices (fetches new policies if it's already expired) and starts/stops replicator if needed.
### Result
Broker can start/stop replicators based on updated policies even if broker misses zk watch.
---
conf/broker.conf | 4 ++++
conf/standalone.conf | 4 ++++
.../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++
.../org/apache/pulsar/broker/service/BrokerService.java | 13 +++++++++++++
4 files changed, 27 insertions(+)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2ded719..e02e323 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -751,6 +751,10 @@ replicationProducerQueueSize=1000
# Replicator prefix used for replicator producer name and cursor name
replicatorPrefix=pulsar.repl
+# Duration to check replication policy to avoid replicator inconsistency
+# due to missing ZooKeeper watch (disable with value 0)
+replicatioPolicyCheckDurationSeconds=600
+
# Default message retention time
defaultRetentionTimeInMinutes=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 9b3b615..4095d88 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -502,6 +502,10 @@ replicationConnectionsPerBroker=16
# Replicator producer queue size
replicationProducerQueueSize=1000
+# Duration to check replication policy to avoid replicator inconsistency
+# due to missing ZooKeeper watch (disable with value 0)
+replicatioPolicyCheckDurationSeconds=600
+
# Default message retention time
defaultRetentionTimeInMinutes=0
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 239442e..3cc068f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1281,6 +1281,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Replicator producer queue size"
)
private int replicationProducerQueueSize = 1000;
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Duration to check replication policy to avoid replicator "
+ + "inconsistency due to missing ZooKeeper watch (disable with value 0)"
+ )
+ private int replicatioPolicyCheckDurationSeconds = 600;
@Deprecated
@FieldContext(
category = CATEGORY_REPLICATION,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e065bbd..a28f3a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -404,6 +404,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.startMessagePublishBufferMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
+ this.startCheckReplicationPolicies();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
@@ -446,6 +447,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
TimeUnit.MINUTES);
}
+ protected void startCheckReplicationPolicies() {
+ int interval = pulsar.getConfig().getReplicatioPolicyCheckDurationSeconds();
+ if (interval > 0) {
+ messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies), interval, interval,
+ TimeUnit.SECONDS);
+ }
+ }
+
protected void startCompactionMonitor() {
int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
@@ -1143,6 +1152,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
forEachTopic(Topic::checkMessageExpiry);
}
+ public void checkReplicationPolicies() {
+ forEachTopic(Topic::checkReplication);
+ }
+
public void checkCompaction() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {