You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/12 08:24:43 UTC

[GitHub] sijie closed pull request #969: BookieWatcher: add stats for ensemble changes

sijie closed pull request #969: BookieWatcher: add stats for ensemble changes
URL: https://github.com/apache/bookkeeper/pull/969
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 2e104cd56..6faec83e0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -61,6 +61,11 @@
     String GET_BOOKIE_INFO_REQUEST = "GET_BOOKIE_INFO_REQUEST";
     String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
 
+    // Ensemble Stats
+    String WATCHER_SCOPE = "bookie_watcher";
+    String REPLACE_BOOKIE_TIME = "REPLACE_BOOKIE_TIME";
+    String NEW_ENSEMBLE_TIME = "NEW_ENSEMBLE_TIME";
+
     // Bookie Operations
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
     String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 6dddfb1c3..60a62f585 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -396,7 +397,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
     /**
      * Constructor for use with the builder. Other constructors also use it.
      */
-    private BookKeeper(ClientConfiguration conf,
+    @VisibleForTesting
+    BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
                        EventLoopGroup eventLoopGroup,
                        StatsLogger statsLogger,
@@ -489,11 +491,12 @@ private BookKeeper(ClientConfiguration conf,
         } else {
             this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
-
         // initialize bookie client
         this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool,
                                              scheduler, statsLogger);
-        this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, regClient);
+        this.bookieWatcher = new BookieWatcher(
+                conf, this.placementPolicy, regClient,
+                this.statsLogger.scope(WATCHER_SCOPE));
         if (conf.getDiskWeightBasedPlacementEnabled()) {
             LOG.info("Weighted ledger placement enabled");
             ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder()
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 3ecf94726..498a18c51 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,6 +17,9 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
+
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -35,9 +38,12 @@
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 /**
  * This class is responsible for maintaining a consistent view of what bookies
@@ -64,6 +70,8 @@
     private final ClientConfiguration conf;
     private final RegistrationClient registrationClient;
     private final EnsemblePlacementPolicy placementPolicy;
+    private final OpStatsLogger newEnsembleTimer;
+    private final OpStatsLogger replaceBookieTimer;
 
     // Bookies that will not be preferred to be chosen in a new ensemble
     final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
@@ -76,7 +84,8 @@
 
     public BookieWatcher(ClientConfiguration conf,
                          EnsemblePlacementPolicy placementPolicy,
-                         RegistrationClient registrationClient) {
+                         RegistrationClient registrationClient,
+                         StatsLogger statsLogger)  {
         this.conf = conf;
         this.placementPolicy = placementPolicy;
         this.registrationClient = registrationClient;
@@ -90,6 +99,8 @@ public void onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie)
                     }
 
                 }).build();
+        this.newEnsembleTimer = statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
+        this.replaceBookieTimer = statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
     }
 
     public Set<BookieSocketAddress> getBookies() throws BKException {
@@ -191,18 +202,23 @@ public void initialBlockingBookieRead() throws BKException {
     public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
+        long startTime = MathUtils.nowInNano();
+        ArrayList<BookieSocketAddress> socketAddresses;
         try {
-            // we try to only get from the healthy bookies first
-            return placementPolicy.newEnsemble(ensembleSize,
+            socketAddresses = placementPolicy.newEnsemble(ensembleSize,
                     writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(
-                    quarantinedBookies.asMap().keySet()));
+                            quarantinedBookies.asMap().keySet()));
+            // we try to only get from the healthy bookies first
+            newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.newEnsemble(
-                ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
+            socketAddresses = placementPolicy.newEnsemble(
+                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
+            newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         }
+        return socketAddresses;
     }
 
     /**
@@ -219,20 +235,27 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
                                              List<BookieSocketAddress> existingBookies, int bookieIdx,
                                              Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
+        long startTime = MathUtils.nowInNano();
         BookieSocketAddress addr = existingBookies.get(bookieIdx);
+        BookieSocketAddress socketAddress;
         try {
             // we exclude the quarantined bookies also first
             Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
             existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
+            socketAddress = placementPolicy.replaceBookie(
+                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     existingAndQuarantinedBookies, addr, excludeBookies);
+            replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
+            socketAddress = placementPolicy.replaceBookie(
+                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
+            replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         }
+        return socketAddress;
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index e6b8fa1ad..4981159c0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -30,6 +30,8 @@
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.zookeeper.ZooKeeper;
 
 /**
@@ -38,9 +40,19 @@
  */
 @Slf4j
 public class BookKeeperTestClient extends BookKeeper {
-    public BookKeeperTestClient(ClientConfiguration conf)
+    TestStatsProvider statsProvider;
+
+    public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider)
             throws IOException, InterruptedException, BKException {
-        super(conf);
+        super(conf, null, null,
+              statsProvider == null ? NullStatsLogger.INSTANCE : statsProvider.getStatsLogger(""),
+              null, null, null);
+        this.statsProvider = statsProvider;
+    }
+
+    public BookKeeperTestClient(ClientConfiguration conf)
+            throws InterruptedException, BKException, IOException {
+        this(conf, null);
     }
 
     public ZooKeeper getZkHandle() {
@@ -95,4 +107,8 @@ public BookieClient getBookieClient() {
         regClient.watchReadOnlyBookies(readOnlyListener);
         return CompletableFuture.allOf(writableFuture, readOnlyFuture);
     }
+
+    public TestStatsProvider getTestStatsProvider() {
+        return statsProvider;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 9f424fcf6..b89f80055 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -20,6 +20,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -175,6 +179,7 @@ public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
         startNewBookie();
         startNewBookie();
 
+        bkc.getTestStatsProvider().clear();
         LedgerHandle lh = bkc.createLedger(5, 5, 3, digestType, testPasswd);
 
         byte[] data = "foobar".getBytes();
@@ -184,6 +189,17 @@ public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
             lh.addEntry(data);
         }
 
+        assertTrue(
+                "Stats should have captured a new ensemble",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + NEW_ENSEMBLE_TIME)
+                        .getSuccessCount() > 0);
+        assertTrue(
+                "Stats should not have captured an ensemble change",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
+                        .getSuccessCount() == 0);
+
         logger.info("Kill bookie 0 and write {} entries.", numEntries);
 
         // kill two bookies, but we still have 3 bookies for the ack quorum.
@@ -196,6 +212,11 @@ public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
         // ensure there is no ensemble changed
         assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
                      1, lh.getLedgerMetadata().getEnsembles().size());
+        assertTrue(
+                "Stats should not have captured an ensemble change",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
+                        .getSuccessCount() == 0);
 
         logger.info("Kill bookie 1 and write another {} entries.", numEntries);
 
@@ -217,9 +238,14 @@ public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
             lh.addEntry(data);
         }
 
-        // ensemble change should kill in
+        // ensemble change should kick in
         assertEquals("There should be ensemble change if ack quorum couldn't be formed.",
                      2, lh.getLedgerMetadata().getEnsembles().size());
+        assertTrue(
+                "Stats should have captured an ensemble change",
+                bkc.getTestStatsProvider().getOpStatsLogger(
+                        CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
+                        .getSuccessCount() > 0);
 
         ArrayList<BookieSocketAddress> firstFragment = lh.getLedgerMetadata().getEnsemble(0);
         ArrayList<BookieSocketAddress> secondFragment = lh.getLedgerMetadata().getEnsemble(3 * numEntries);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 300b6f003..ae74c8d0e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -169,7 +169,7 @@ protected void stopZKCluster() throws Exception {
     protected void startBKCluster() throws Exception {
         baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
         if (numBookies > 0) {
-            bkc = new BookKeeperTestClient(baseClientConf);
+            bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
 
         // Create Bookie Servers (B1, B2, B3)
@@ -567,7 +567,7 @@ protected BookieServer startBookie(ServerConfiguration conf)
         bsLoggers.put(address, provider);
 
         if (bkc == null) {
-            bkc = new BookKeeperTestClient(baseClientConf);
+            bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
 
         Future<?> waitForBookie = conf.isForceReadOnlyBookie()
@@ -605,7 +605,7 @@ protected Bookie newBookie(ServerConfiguration conf) {
 
         BookieSocketAddress address = Bookie.getBookieAddress(conf);
         if (bkc == null) {
-            bkc = new BookKeeperTestClient(baseClientConf);
+            bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
         Future<?> waitForBookie = conf.isForceReadOnlyBookie()
             ? bkc.waitForReadOnlyBookie(address)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services