You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2022/02/14 19:17:33 UTC
[bookkeeper] branch master updated: Replication stat num-under-replicated-ledgers changed as with the process of replication
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9fd35f9 Replication stat num-under-replicated-ledgers changed as with the process of replication
9fd35f9 is described below
commit 9fd35f9cefa1b5925dc29e2e734cc5be9df346f1
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 15 03:17:28 2022 +0800
Replication stat num-under-replicated-ledgers changed as with the process of replication
Motivation
Now ReplicationStats numUnderReplicatedLedger registers when `publishSuspectedLedgersAsync`, but its value doesn't decrease as with the ledger replicated successfully, We cannot know the progress of replication from the stat.
Changes
registers a notifyUnderReplicationLedgerChanged when auditor starts. numUnderReplicatedLedger value will decrease when the ledger path under replicate deleted.
Reviewers: Nicolò Boschi <bo...@gmail.com>, Enrico Olivelli <eo...@gmail.com>, Andrey Yegorov <None>
This closes #2805 from gaozhangmin/replication-stats-num-under-replicated-ledgers
---
.../meta/LedgerUnderreplicationManager.java | 9 +++++
.../bookkeeper/meta/NullMetadataBookieDriver.java | 2 +
.../meta/ZkLedgerUnderreplicationManager.java | 23 +++++++++++
.../org/apache/bookkeeper/replication/Auditor.java | 47 ++++++++++++++++++++++
.../bookkeeper/replication/ReplicationStats.java | 2 +
5 files changed, 83 insertions(+)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index ac468d3..c24f920 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -237,6 +237,15 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
long getReplicasCheckCTime() throws ReplicationException.UnavailableException;
/**
+ * Receive notification asynchronously when the num of under-replicated ledgers Changed.
+ *
+ * @param cb
+ * @throws ReplicationException.UnavailableException
+ */
+ void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb)
+ throws ReplicationException.UnavailableException;
+
+ /**
* Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed.
*
* @param cb
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java
index 3a4247f..4a1e6e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java
@@ -394,5 +394,7 @@ public class NullMetadataBookieDriver implements MetadataBookieDriver {
throws ReplicationException.UnavailableException {
throw new ReplicationException.UnavailableException("null");
}
+ @Override
+ public void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb) {}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index b340e07..9dcc81c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.SubTreeCache;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -867,6 +868,28 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
@Override
+ public void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb) throws UnavailableException {
+ LOG.debug("notifyUnderReplicationLedgerChanged()");
+ Watcher w = new Watcher() {
+ @Override
+ public void process(WatchedEvent e) {
+ if (e.getType() == Event.EventType.NodeDeleted && idExtractionPattern.matcher(e.getPath()).find()) {
+ cb.operationComplete(0, null);
+ }
+ }
+ };
+ try {
+ zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE);
+ } catch (KeeperException ke) {
+ LOG.error("Error while checking the state of underReplicated ledgers", ke);
+ throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+ }
+ }
+
+ @Override
public void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb) throws UnavailableException {
LOG.debug("notifyLostBookieRecoveryDelayChanged()");
Watcher w = new Watcher() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index cd85145..c3b69dd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -34,8 +34,10 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAV
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS_GUAGE;
import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
@@ -45,6 +47,7 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
@@ -162,6 +165,7 @@ public class Auditor implements AutoCloseable {
private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry;
private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue;
private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
+ private final AtomicInteger underReplicatedLedgersGuageValue;
private final long underreplicatedLedgerRecoveryGracePeriod;
private final int zkOpTimeoutMs;
private final Semaphore openLedgerNoRecoverySemaphore;
@@ -235,6 +239,11 @@ public class Auditor implements AutoCloseable {
)
private final Counter numDelayedBookieAuditsCancelled;
@StatsDoc(
+ name = NUM_REPLICATED_LEDGERS,
+ help = "the number of replicated ledgers"
+ )
+ private final Counter numReplicatedLedgers;
+ @StatsDoc(
name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check"
)
@@ -266,6 +275,11 @@ public class Auditor implements AutoCloseable {
+ ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry"
)
private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
+ @StatsDoc(
+ name = NUM_UNDER_REPLICATED_LEDGERS_GUAGE,
+ help = "Gauge for num of underreplicated ledgers"
+ )
+ private final Gauge<Integer> numUnderReplicatedLedgers;
static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
@@ -363,6 +377,7 @@ public class Auditor implements AutoCloseable {
this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
+ this.underReplicatedLedgersGuageValue = new AtomicInteger(0);
numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
uRLPublishTimeForLostBookies = this.statsLogger
@@ -379,6 +394,7 @@ public class Auditor implements AutoCloseable {
numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
+ numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
@@ -459,6 +475,18 @@ public class Auditor implements AutoCloseable {
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
numLedgersHavingLessThanWQReplicasOfAnEntry);
+ numUnderReplicatedLedgers = new Gauge<Integer>() {
+ @Override
+ public Integer getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Integer getSample() {
+ return underReplicatedLedgersGuageValue.get();
+ }
+ };
+ this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE, numUnderReplicatedLedgers);
this.bkc = bkc;
this.ownBkc = ownBkc;
@@ -703,6 +731,15 @@ public class Auditor implements AutoCloseable {
submitShutdownTask();
}
+ try {
+ this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(
+ new UnderReplicatedLedgersChangedCb());
+ } catch (UnavailableException ue) {
+ LOG.error("Exception while registering for under-replicated ledgers change notification, so exiting",
+ ue);
+ submitShutdownTask();
+ }
+
scheduleBookieCheckTask();
scheduleCheckAllLedgersTask();
schedulePlacementPolicyCheckTask();
@@ -1010,6 +1047,16 @@ public class Auditor implements AutoCloseable {
}), initialDelay, interval, TimeUnit.SECONDS);
}
+ private class UnderReplicatedLedgersChangedCb implements GenericCallback<Void> {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = ledgerUnderreplicationManager
+ .listLedgersToRereplicate(null);
+ underReplicatedLedgersGuageValue.set(Iterators.size(underreplicatedLedgersInfo));
+ numReplicatedLedgers.inc();
+ }
+ }
+
private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index d04fcf0..74b76b2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -64,4 +64,6 @@ public interface ReplicationStats {
String REPLICATE_EXCEPTION = "exceptions";
String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
+ String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE";
+ String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS";
}