You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2022/02/14 19:17:33 UTC

[bookkeeper] branch master updated: Replication stat num-under-replicated-ledgers changed as with the process of replication

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fd35f9  Replication stat num-under-replicated-ledgers changed as with the process of replication
9fd35f9 is described below

commit 9fd35f9cefa1b5925dc29e2e734cc5be9df346f1
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 15 03:17:28 2022 +0800

    Replication stat num-under-replicated-ledgers changed as with the process of replication
    
    Motivation
    Now ReplicationStats numUnderReplicatedLedger registers when `publishSuspectedLedgersAsync`, but its value doesn't decrease as with the ledger replicated successfully, We cannot know the progress of replication from the stat.
    
    Changes
    registers a notifyUnderReplicationLedgerChanged when auditor starts. numUnderReplicatedLedger value will decrease when the ledger path under replicate deleted.
    
    Reviewers: Nicolò Boschi <bo...@gmail.com>, Enrico Olivelli <eo...@gmail.com>, Andrey Yegorov <None>
    
    This closes #2805 from gaozhangmin/replication-stats-num-under-replicated-ledgers
---
 .../meta/LedgerUnderreplicationManager.java        |  9 +++++
 .../bookkeeper/meta/NullMetadataBookieDriver.java  |  2 +
 .../meta/ZkLedgerUnderreplicationManager.java      | 23 +++++++++++
 .../org/apache/bookkeeper/replication/Auditor.java | 47 ++++++++++++++++++++++
 .../bookkeeper/replication/ReplicationStats.java   |  2 +
 5 files changed, 83 insertions(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index ac468d3..c24f920 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -237,6 +237,15 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
     long getReplicasCheckCTime() throws ReplicationException.UnavailableException;
 
     /**
+     * Receive notification asynchronously when the num of under-replicated ledgers  Changed.
+     *
+     * @param cb
+     * @throws ReplicationException.UnavailableException
+     */
+    void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb)
+            throws ReplicationException.UnavailableException;
+
+    /**
      * Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed.
      *
      * @param cb
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java
index 3a4247f..4a1e6e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java
@@ -394,5 +394,7 @@ public class NullMetadataBookieDriver implements MetadataBookieDriver {
                 throws ReplicationException.UnavailableException {
             throw new ReplicationException.UnavailableException("null");
         }
+        @Override
+        public void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb) {}
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index b340e07..9dcc81c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.SubTreeCache;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -867,6 +868,28 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     }
 
     @Override
+    public void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb) throws UnavailableException {
+        LOG.debug("notifyUnderReplicationLedgerChanged()");
+        Watcher w = new Watcher() {
+            @Override
+            public void process(WatchedEvent e) {
+                if (e.getType() == Event.EventType.NodeDeleted && idExtractionPattern.matcher(e.getPath()).find()) {
+                    cb.operationComplete(0, null);
+                }
+            }
+        };
+        try {
+            zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE);
+        } catch (KeeperException ke) {
+            LOG.error("Error while checking the state of underReplicated ledgers", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
     public void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb) throws UnavailableException {
         LOG.debug("notifyLostBookieRecoveryDelayChanged()");
         Watcher w = new Watcher() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index cd85145..c3b69dd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -34,8 +34,10 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAV
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_REPLICATED_LEDGERS;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS_GUAGE;
 import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
 import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
 import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
@@ -45,6 +47,7 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
@@ -162,6 +165,7 @@ public class Auditor implements AutoCloseable {
     private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry;
     private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue;
     private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
+    private final AtomicInteger underReplicatedLedgersGuageValue;
     private final long underreplicatedLedgerRecoveryGracePeriod;
     private final int zkOpTimeoutMs;
     private final Semaphore openLedgerNoRecoverySemaphore;
@@ -235,6 +239,11 @@ public class Auditor implements AutoCloseable {
     )
     private final Counter numDelayedBookieAuditsCancelled;
     @StatsDoc(
+            name = NUM_REPLICATED_LEDGERS,
+            help = "the number of replicated ledgers"
+    )
+    private final Counter numReplicatedLedgers;
+    @StatsDoc(
             name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
             help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check"
     )
@@ -266,6 +275,11 @@ public class Auditor implements AutoCloseable {
                     + ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry"
     )
     private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
+    @StatsDoc(
+            name = NUM_UNDER_REPLICATED_LEDGERS_GUAGE,
+            help = "Gauge for num of underreplicated ledgers"
+    )
+    private final Gauge<Integer> numUnderReplicatedLedgers;
 
     static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
         return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
@@ -363,6 +377,7 @@ public class Auditor implements AutoCloseable {
         this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
             conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
 
+        this.underReplicatedLedgersGuageValue = new AtomicInteger(0);
         numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
         underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
         uRLPublishTimeForLostBookies = this.statsLogger
@@ -379,6 +394,7 @@ public class Auditor implements AutoCloseable {
         numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
         numDelayedBookieAuditsCancelled = this.statsLogger
                 .getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
+        numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
         numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
             @Override
             public Integer getDefaultValue() {
@@ -459,6 +475,18 @@ public class Auditor implements AutoCloseable {
         };
         this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
                 numLedgersHavingLessThanWQReplicasOfAnEntry);
+        numUnderReplicatedLedgers = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return underReplicatedLedgersGuageValue.get();
+            }
+        };
+        this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE, numUnderReplicatedLedgers);
 
         this.bkc = bkc;
         this.ownBkc = ownBkc;
@@ -703,6 +731,15 @@ public class Auditor implements AutoCloseable {
                 submitShutdownTask();
             }
 
+            try {
+                this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(
+                        new UnderReplicatedLedgersChangedCb());
+            } catch (UnavailableException ue) {
+                LOG.error("Exception while registering for under-replicated ledgers change notification, so exiting",
+                        ue);
+                submitShutdownTask();
+            }
+
             scheduleBookieCheckTask();
             scheduleCheckAllLedgersTask();
             schedulePlacementPolicyCheckTask();
@@ -1010,6 +1047,16 @@ public class Auditor implements AutoCloseable {
         }), initialDelay, interval, TimeUnit.SECONDS);
     }
 
+    private class UnderReplicatedLedgersChangedCb implements GenericCallback<Void> {
+        @Override
+        public void operationComplete(int rc, Void result) {
+            Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = ledgerUnderreplicationManager
+                    .listLedgersToRereplicate(null);
+            underReplicatedLedgersGuageValue.set(Iterators.size(underreplicatedLedgersInfo));
+            numReplicatedLedgers.inc();
+        }
+    }
+
     private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
         @Override
         public void operationComplete(int rc, Void result) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index d04fcf0..74b76b2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -64,4 +64,6 @@ public interface ReplicationStats {
     String REPLICATE_EXCEPTION = "exceptions";
     String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
     String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
+    String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE";
+    String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS";
 }