You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2019/03/04 23:04:13 UTC

[zookeeper] branch master updated: ZOOKEEPER-3267: Add watcher metrics

This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dafc40d  ZOOKEEPER-3267: Add watcher metrics
dafc40d is described below

commit dafc40d59f3d4ed1d2586bf93fc340d20fb6f6c4
Author: Jie Huang <ji...@fb.com>
AuthorDate: Mon Mar 4 17:04:10 2019 -0600

    ZOOKEEPER-3267: Add watcher metrics
    
    Author: Jie Huang <ji...@fb.com>
    
    Reviewers: fangmin@apache.org, andor@apache.org
    
    Closes #796 from jhuan31/ZOOKEEPER-3267 and squashes the following commits:
    
    4dafb7e94 [Jie Huang] refactor/de-flaky unit tests
    9bdc93a04 [Jie Huang] fix ADD_DEAD_WATCHERS_STALL_TIME measurement
    8e45fc253 [Jie Huang] add unit tests for the watcher metrics
    2d786e455 [Jie Huang] ZOOKEEPER-3267: Add watcher metrics
---
 .../org/apache/zookeeper/server/ServerMetrics.java | 17 ++++++
 .../zookeeper/server/watch/WatchManager.java       | 23 +++++++
 .../server/watch/WatchManagerOptimized.java        | 28 +++++++++
 .../zookeeper/server/watch/WatcherCleaner.java     |  7 +++
 .../zookeeper/server/watch/WatchManagerTest.java   | 71 +++++++++++++++++++++-
 .../zookeeper/server/watch/WatcherCleanerTest.java | 41 +++++++++++++
 6 files changed, 186 insertions(+), 1 deletion(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index ffa1317..bc66d66 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -76,6 +76,23 @@ public enum ServerMetrics {
 
     BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")),
 
+    /**
+     * Fired watcher stats.
+     */
+    NODE_CREATED_WATCHER(new AvgMinMaxCounter("node_created_watch_count")),
+    NODE_DELETED_WATCHER(new AvgMinMaxCounter("node_deleted_watch_count")),
+    NODE_CHANGED_WATCHER(new AvgMinMaxCounter("node_changed_watch_count")),
+    NODE_CHILDREN_WATCHER(new AvgMinMaxCounter("node_children_watch_count")),
+
+
+    /*
+     * Number of dead watchers in DeadWatcherListener
+     */
+    ADD_DEAD_WATCHER_STALL_TIME(new SimpleCounter("add_dead_watcher_stall_time")),
+    DEAD_WATCHERS_QUEUED(new SimpleCounter("dead_watchers_queued")),
+    DEAD_WATCHERS_CLEARED(new SimpleCounter("dead_watchers_cleared")),
+    DEAD_WATCHERS_CLEANER_LATENCY(new AvgMinMaxPercentileCounter("dead_watchers_cleaner_latency")),
+
     RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")),
     RESPONSE_PACKET_CACHE_MISSING(new SimpleCounter("response_packet_cache_misses")),
     
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
index 3e14f6e..3ba866d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
@@ -31,6 +31,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooTrace;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,6 +139,28 @@ public class WatchManager implements IWatchManager {
             }
             w.process(e);
         }
+
+        switch (type) {
+        case NodeCreated:
+            ServerMetrics.NODE_CREATED_WATCHER.add(watchers.size());
+            break;
+
+        case NodeDeleted:
+            ServerMetrics.NODE_DELETED_WATCHER.add(watchers.size());
+            break;
+
+        case NodeDataChanged:
+            ServerMetrics.NODE_CHANGED_WATCHER.add(watchers.size());
+            break;
+
+        case NodeChildrenChanged:
+            ServerMetrics.NODE_CHILDREN_WATCHER.add(watchers.size());
+            break;
+        default:
+            // Other types not logged.
+            break;
+        }
+
         return new WatcherOrBitSet(watchers);
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
index 6abb760..f0c8fe4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.util.BitHashSet;
 import org.apache.zookeeper.server.util.BitMap;
 import org.slf4j.Logger;
@@ -222,6 +223,8 @@ public class WatchManagerOptimized
             return null;
         }
 
+        int triggeredWatches = 0;
+
         // Avoid race condition between dead watcher cleaner in
         // WatcherCleaner and iterating here
         synchronized (watchers) {
@@ -238,9 +241,11 @@ public class WatchManagerOptimized
                 }
 
                 w.process(e);
+                triggeredWatches++;
             }
         }
 
+        updateMetrics(type, triggeredWatches);
         return new WatcherOrBitSet(watchers);
     }
 
@@ -269,6 +274,29 @@ public class WatchManagerOptimized
         }
     }
 
+    void updateMetrics(final EventType type, int size) {
+        switch (type) {
+        case NodeCreated:
+            ServerMetrics.NODE_CREATED_WATCHER.add(size);
+            break;
+
+        case NodeDeleted:
+            ServerMetrics.NODE_DELETED_WATCHER.add(size);
+            break;
+
+        case NodeDataChanged:
+            ServerMetrics.NODE_CHANGED_WATCHER.add(size);
+            break;
+
+        case NodeChildrenChanged:
+            ServerMetrics.NODE_CHILDREN_WATCHER.add(size);
+            break;
+        default:
+            // Other types not logged.
+            break;
+        }
+    }
+
     boolean isDeadWatcher(Watcher watcher) {
         return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java
index 9648848..6b67877 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.server.RateLogger;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.WorkerService;
 import org.apache.zookeeper.server.WorkerService.WorkRequest;
 
@@ -103,9 +104,12 @@ public class WatcherCleaner extends Thread {
                 totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
             try {
                 RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
+                long startTime = Time.currentElapsedTime();
                 synchronized(processingCompletedEvent) {
                     processingCompletedEvent.wait(100);
                 }
+                long latency = Time.currentElapsedTime() - startTime;
+                ServerMetrics.ADD_DEAD_WATCHER_STALL_TIME.add(latency);
             } catch (InterruptedException e) {
                 LOG.info("Got interrupted while waiting for dead watches " +
                         "queue size");
@@ -115,6 +119,7 @@ public class WatcherCleaner extends Thread {
         synchronized (this) {
             if (deadWatchers.add(watcherBit)) {
                 totalDeadWatchers.incrementAndGet();
+                ServerMetrics.DEAD_WATCHERS_QUEUED.add(1);
                 if (deadWatchers.size() >= watcherCleanThreshold) {
                     synchronized (cleanEvent) {
                         cleanEvent.notifyAll();
@@ -164,6 +169,8 @@ public class WatcherCleaner extends Thread {
                         listener.processDeadWatchers(snapshot);
                         long latency = Time.currentElapsedTime() - startTime;
                         LOG.info("Takes {} to process {} watches", latency, total);
+                        ServerMetrics.DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
+                        ServerMetrics.DEAD_WATCHERS_CLEARED.add(total);
                         totalDeadWatchers.addAndGet(-total);
                         synchronized(processingCompletedEvent) {
                             processingCompletedEvent.notifyAll();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
index f6a229b..166cfff 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
@@ -33,6 +33,10 @@ import org.apache.zookeeper.server.DumbWatcher;
 import org.apache.zookeeper.server.ServerCnxn;
 
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.metric.AvgMinMaxCounter;
+import org.apache.zookeeper.server.metric.Metric;
+import org.eclipse.jetty.util.IO;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -390,8 +394,10 @@ public class WatchManagerTest extends ZKTestCase {
         }
 
         // 4. sleep for a while to make sure all the thread exited
+        // the cleaner may wait as long as CleanerInterval+CleanerInterval/2+1
+        // So need to sleep as least that long
         try {
-            Thread.sleep(1000);
+            Thread.sleep(2000);
         } catch (InterruptedException e) {}
 
         // 5. make sure the dead watchers are not in the existing watchers
@@ -401,4 +407,67 @@ public class WatchManagerTest extends ZKTestCase {
                     existingWatchers.hasPaths(((ServerCnxn) w).getSessionId()));
         }
     }
+
+    private void checkMetrics(String metricName, long min, long max, double avg, long cnt, long sum){
+        Map<String, Object> values = ServerMetrics.getAllValues();
+
+        Assert.assertEquals(min, values.get("min_" + metricName));
+        Assert.assertEquals(max, values.get("max_" + metricName));
+        Assert.assertEquals(avg, (Double)values.get("avg_" + metricName), 0.000001);
+        Assert.assertEquals(cnt, values.get("cnt_" + metricName));
+        Assert.assertEquals(sum, values.get("sum_" + metricName));
+    }
+
+    @Test
+    public void testWatcherMetrics() throws IOException {
+        IWatchManager manager = getWatchManager();
+        ServerMetrics.resetAll();
+
+        DumbWatcher watcher1 = new DumbWatcher(1);
+        DumbWatcher watcher2 = new DumbWatcher(2);
+
+        final String path1 = "/path1";
+        final String path2 = "/path2";
+
+        final String path3 = "/path3";
+
+        //both wather1 and wather2 are watching path1
+        manager.addWatch(path1, watcher1);
+        manager.addWatch(path1, watcher2);
+
+        //path2 is watched by watcher1
+        manager.addWatch(path2, watcher1);
+
+        manager.triggerWatch(path3, EventType.NodeCreated);
+        //path3 is not being watched so metric is 0
+        checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
+
+        //path1 is watched by two watchers so two fired
+        manager.triggerWatch(path1, EventType.NodeCreated);
+        checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
+
+        //path2 is watched by one watcher so one fired now total is 3
+        manager.triggerWatch(path2, EventType.NodeCreated);
+        checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
+
+        //watches on path1 are no longer there so zero fired
+        manager.triggerWatch(path1, EventType.NodeDataChanged);
+        checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
+
+        //both wather1 and wather2 are watching path1
+        manager.addWatch(path1, watcher1);
+        manager.addWatch(path1, watcher2);
+
+        //path2 is watched by watcher1
+        manager.addWatch(path2, watcher1);
+
+        manager.triggerWatch(path1, EventType.NodeDataChanged);
+        checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
+
+        manager.triggerWatch(path2, EventType.NodeDeleted);
+        checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
+
+        //make sure that node created watch count is not impacted by the fire of other event types
+        checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
index d315232..64989ce 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.zookeeper.server.watch;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
@@ -23,10 +24,16 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.junit.Test;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 
 public class WatcherCleanerTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleanerTest.class);
 
     public static class MyDeadWatcherListener implements IDeadWatcherListener {
 
@@ -124,4 +131,38 @@ public class WatcherCleanerTest extends ZKTestCase {
         Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
         Assert.assertTrue(listener.wait(5000));
     }
+
+    @Test
+    public void testDeadWatcherMetrics() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 1, 1, 1);
+        listener.setDelayMs(20);
+        cleaner.start();
+        listener.setCountDownLatch(new CountDownLatch(3));
+        //the dead watchers will be added one by one and cleared one by one because we set both watchCleanThreshold and
+        //maxInProcessingDeadWatchers to 1
+        cleaner.addDeadWatcher(1);
+        cleaner.addDeadWatcher(2);
+        cleaner.addDeadWatcher(3);
+
+        Assert.assertTrue(listener.wait(5000));
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+
+        Assert.assertThat("Adding dead watcher should be stalled twice",
+                          (Long)values.get("add_dead_watcher_stall_time"),
+                           greaterThan(0L));
+        Assert.assertEquals("Total dead watchers added to the queue should be 3", 3L, values.get("dead_watchers_queued"));
+        Assert.assertEquals("Total dead watchers cleared should be 3", 3L, values.get("dead_watchers_cleared"));
+
+        Assert.assertEquals(3L, values.get("cnt_dead_watchers_cleaner_latency"));
+
+        //Each latency should be a little over 20 ms, allow 5 ms deviation
+        Assert.assertEquals(20D, (Double)values.get("avg_dead_watchers_cleaner_latency"), 5);
+        Assert.assertEquals(20D, ((Long)values.get("min_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("max_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("p50_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("p95_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("p99_dead_watchers_cleaner_latency")).doubleValue(), 5);
+    }
 }