You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/12 08:24:38 UTC

[bookkeeper] branch master updated: BookieWatcher: add stats for ensemble changes

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c53756b  BookieWatcher: add stats for ensemble changes
c53756b is described below

commit c53756bdb1fafb3221b32222602cbc86327e17a7
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Fri Jan 12 00:24:31 2018 -0800

    BookieWatcher: add stats for ensemble changes
    
    (bug W-3019451)
    Signed-off-by: Dustin Castor <dcastorsalesforce.com>
    [Fixed up merge/checkstyle issues, added tests]
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: Samuel Just <sj...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #969 from athanatos/forupstream/stats1/ensemblechange
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |  5 +++
 .../org/apache/bookkeeper/client/BookKeeper.java   |  9 +++--
 .../apache/bookkeeper/client/BookieWatcher.java    | 39 +++++++++++++++++-----
 .../bookkeeper/client/BookKeeperTestClient.java    | 20 +++++++++--
 .../bookkeeper/client/TestDelayEnsembleChange.java | 28 +++++++++++++++-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  6 ++--
 6 files changed, 90 insertions(+), 17 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 7e1969c..636e9fd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -61,6 +61,11 @@ public interface BookKeeperServerStats {
     String GET_BOOKIE_INFO_REQUEST = "GET_BOOKIE_INFO_REQUEST";
     String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
 
+    // Ensemble Stats
+    String WATCHER_SCOPE = "bookie_watcher";
+    String REPLACE_BOOKIE_TIME = "REPLACE_BOOKIE_TIME";
+    String NEW_ENSEMBLE_TIME = "NEW_ENSEMBLE_TIME";
+
     // Bookie Operations
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
     String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index a8c181b..f07986d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -395,7 +396,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
     /**
      * Constructor for use with the builder. Other constructors also use it.
      */
-    private BookKeeper(ClientConfiguration conf,
+    @VisibleForTesting
+    BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
                        EventLoopGroup eventLoopGroup,
                        StatsLogger statsLogger,
@@ -491,11 +493,12 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         } else {
             this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
-
         // initialize bookie client
         this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool,
                                              scheduler, statsLogger);
-        this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, regClient);
+        this.bookieWatcher = new BookieWatcher(
+                conf, this.placementPolicy, regClient,
+                this.statsLogger.scope(WATCHER_SCOPE));
         if (conf.getDiskWeightBasedPlacementEnabled()) {
             LOG.info("Weighted ledger placement enabled");
             ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder()
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 3ecf947..498a18c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,6 +17,9 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
+
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -35,9 +38,12 @@ import org.apache.bookkeeper.client.BKException.BKInterruptedException;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 /**
  * This class is responsible for maintaining a consistent view of what bookies
@@ -64,6 +70,8 @@ class BookieWatcher {
     private final ClientConfiguration conf;
     private final RegistrationClient registrationClient;
     private final EnsemblePlacementPolicy placementPolicy;
+    private final OpStatsLogger newEnsembleTimer;
+    private final OpStatsLogger replaceBookieTimer;
 
     // Bookies that will not be preferred to be chosen in a new ensemble
     final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
@@ -76,7 +84,8 @@ class BookieWatcher {
 
     public BookieWatcher(ClientConfiguration conf,
                          EnsemblePlacementPolicy placementPolicy,
-                         RegistrationClient registrationClient) {
+                         RegistrationClient registrationClient,
+                         StatsLogger statsLogger)  {
         this.conf = conf;
         this.placementPolicy = placementPolicy;
         this.registrationClient = registrationClient;
@@ -90,6 +99,8 @@ class BookieWatcher {
                     }
 
                 }).build();
+        this.newEnsembleTimer = statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
+        this.replaceBookieTimer = statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
     }
 
     public Set<BookieSocketAddress> getBookies() throws BKException {
@@ -191,18 +202,23 @@ class BookieWatcher {
     public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
+        long startTime = MathUtils.nowInNano();
+        ArrayList<BookieSocketAddress> socketAddresses;
         try {
-            // we try to only get from the healthy bookies first
-            return placementPolicy.newEnsemble(ensembleSize,
+            socketAddresses = placementPolicy.newEnsemble(ensembleSize,
                     writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(
-                    quarantinedBookies.asMap().keySet()));
+                            quarantinedBookies.asMap().keySet()));
+            // we try to only get from the healthy bookies first
+            newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.newEnsemble(
-                ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
+            socketAddresses = placementPolicy.newEnsemble(
+                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
+            newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         }
+        return socketAddresses;
     }
 
     /**
@@ -219,20 +235,27 @@ class BookieWatcher {
                                              List<BookieSocketAddress> existingBookies, int bookieIdx,
                                              Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
+        long startTime = MathUtils.nowInNano();
         BookieSocketAddress addr = existingBookies.get(bookieIdx);
+        BookieSocketAddress socketAddress;
         try {
             // we exclude the quarantined bookies also first
             Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
             existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
+            socketAddress = placementPolicy.replaceBookie(
+                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     existingAndQuarantinedBookies, addr, excludeBookies);
+            replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
+            socketAddress = placementPolicy.replaceBookie(
+                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
+            replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         }
+        return socketAddress;
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index e6b8fa1..4981159 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -30,6 +30,8 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.zookeeper.ZooKeeper;
 
 /**
@@ -38,9 +40,19 @@ import org.apache.zookeeper.ZooKeeper;
  */
 @Slf4j
 public class BookKeeperTestClient extends BookKeeper {
-    public BookKeeperTestClient(ClientConfiguration conf)
+    TestStatsProvider statsProvider;
+
+    public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider)
             throws IOException, InterruptedException, BKException {
-        super(conf);
+        super(conf, null, null,
+              statsProvider == null ? NullStatsLogger.INSTANCE : statsProvider.getStatsLogger(""),
+              null, null, null);
+        this.statsProvider = statsProvider;
+    }
+
+    public BookKeeperTestClient(ClientConfiguration conf)
+            throws InterruptedException, BKException, IOException {
+        this(conf, null);
     }
 
     public ZooKeeper getZkHandle() {
@@ -95,4 +107,8 @@ public class BookKeeperTestClient extends BookKeeper {
         regClient.watchReadOnlyBookies(readOnlyListener);
         return CompletableFuture.allOf(writableFuture, readOnlyFuture);
     }
+
+    public TestStatsProvider getTestStatsProvider() {
+        return statsProvider;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 9f424fc..b89f800 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -20,6 +20,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -175,6 +179,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         startNewBookie();
         startNewBookie();
 
+        bkc.getTestStatsProvider().clear();
         LedgerHandle lh = bkc.createLedger(5, 5, 3, digestType, testPasswd);
 
         byte[] data = "foobar".getBytes();
@@ -184,6 +189,17 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
             lh.addEntry(data);
         }
 
+        assertTrue(
+                "Stats should have captured a new ensemble",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + NEW_ENSEMBLE_TIME)
+                        .getSuccessCount() > 0);
+        assertTrue(
+                "Stats should not have captured an ensemble change",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
+                        .getSuccessCount() == 0);
+
         logger.info("Kill bookie 0 and write {} entries.", numEntries);
 
         // kill two bookies, but we still have 3 bookies for the ack quorum.
@@ -196,6 +212,11 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         // ensure there is no ensemble changed
         assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
                      1, lh.getLedgerMetadata().getEnsembles().size());
+        assertTrue(
+                "Stats should not have captured an ensemble change",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
+                        .getSuccessCount() == 0);
 
         logger.info("Kill bookie 1 and write another {} entries.", numEntries);
 
@@ -217,9 +238,14 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
             lh.addEntry(data);
         }
 
-        // ensemble change should kill in
+        // ensemble change should kick in
         assertEquals("There should be ensemble change if ack quorum couldn't be formed.",
                      2, lh.getLedgerMetadata().getEnsembles().size());
+        assertTrue(
+                "Stats should have captured an ensemble change",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
+                        .getSuccessCount() > 0);
 
         ArrayList<BookieSocketAddress> firstFragment = lh.getLedgerMetadata().getEnsemble(0);
         ArrayList<BookieSocketAddress> secondFragment = lh.getLedgerMetadata().getEnsemble(3 * numEntries);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 5ed514f..ee48cb9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -169,7 +169,7 @@ public abstract class BookKeeperClusterTestCase {
     protected void startBKCluster() throws Exception {
         baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
         if (numBookies > 0) {
-            bkc = new BookKeeperTestClient(baseClientConf);
+            bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
 
         // Create Bookie Servers (B1, B2, B3)
@@ -567,7 +567,7 @@ public abstract class BookKeeperClusterTestCase {
         bsLoggers.put(address, provider);
 
         if (bkc == null) {
-            bkc = new BookKeeperTestClient(baseClientConf);
+            bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
 
         Future<?> waitForBookie = conf.isForceReadOnlyBookie()
@@ -605,7 +605,7 @@ public abstract class BookKeeperClusterTestCase {
 
         BookieSocketAddress address = Bookie.getBookieAddress(conf);
         if (bkc == null) {
-            bkc = new BookKeeperTestClient(baseClientConf);
+            bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
         Future<?> waitForBookie = conf.isForceReadOnlyBookie()
             ? bkc.waitForReadOnlyBookie(address)

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].