You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/26 05:34:26 UTC

[pulsar] branch master updated: Clear-out old inbound cnx stats when repl producer disconnects (#3250)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e4f769  Clear-out old inbound cnx stats when repl producer disconnects (#3250)
1e4f769 is described below

commit 1e4f7695972c2ccd2f2b9566e5b9eefeec650b5f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Dec 25 21:34:21 2018 -0800

    Clear-out old inbound cnx stats when repl producer disconnects (#3250)
    
    ### Motivation
    
    Right now, when remote replication-producer gets disconnected due to below `ProducerBlockedQuotaExceededError`, broker doesn't cleanup inbound repl-producer stats for the topic and it shows invalid stale inbound cnx stats until new producer gets created successfully.
    
    ```
    20:10:28.628 [pulsar-io-21-2] WARN  org.apache.pulsar.broker.service.AbstractReplicator - [persistent://localsearch/global/ns/t1][east1 -> west1] Failed to create remote producer (org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError: Cannot create producer on topic with backlog quota exceeded), retrying in 5
    ```
    
    ### Modifications
    
    Clear-out old stale inbound-repl stats.
    
    ### Result
    
    Broker will not show invalid stale repl-stats.
---
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cc73d0c..77bd3ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1211,12 +1211,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
             // Add incoming msg rates
             PublisherStats pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
-            if (pubStats != null) {
-                rStat.msgRateIn = pubStats.msgRateIn;
-                rStat.msgThroughputIn = pubStats.msgThroughputIn;
-                rStat.inboundConnection = pubStats.getAddress();
-                rStat.inboundConnectedSince = pubStats.getConnectedSince();
-            }
+            rStat.msgRateIn = pubStats != null ? pubStats.msgRateIn : 0;
+            rStat.msgThroughputIn = pubStats != null ? pubStats.msgThroughputIn : 0;
+            rStat.inboundConnection = pubStats != null ? pubStats.getAddress() : null;
+            rStat.inboundConnectedSince = pubStats != null ? pubStats.getConnectedSince() : null;
 
             topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
             topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;