You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fa...@apache.org on 2019/05/06 15:49:06 UTC

[zookeeper] branch master updated: ZOOKEEPER-3305: Add Quorum Packet metrics

This is an automated email from the ASF dual-hosted git repository.

fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a25fe4  ZOOKEEPER-3305: Add Quorum Packet metrics
3a25fe4 is described below

commit 3a25fe4b6c07937dc6859cafd94f3532b3a0befe
Author: Jie Huang <ji...@fb.com>
AuthorDate: Mon May 6 08:48:57 2019 -0700

    ZOOKEEPER-3305: Add Quorum Packet metrics
    
    Author: Jie Huang <ji...@fb.com>
    
    Reviewers: eolivelli@apache.org, fangmin@apache.org
    
    Closes #849 from jhuan31/ZOOKEEPER-3305
---
 .../org/apache/zookeeper/server/ServerMetrics.java |  11 ++
 .../zookeeper/server/quorum/LearnerHandler.java    |  61 +++++++++++
 .../server/quorum/LearnerHandlerMetricsTest.java   | 114 +++++++++++++++++++++
 .../server/quorum/LearnerHandlerTest.java          |   5 +
 4 files changed, 191 insertions(+)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 6ff2474..37ffee1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -192,6 +192,11 @@ public final class ServerMetrics {
         LEARNER_PROPOSAL_RECEIVED_COUNT = metricsContext.getCounter("learner_proposal_received_count");
         LEARNER_COMMIT_RECEIVED_COUNT = metricsContext.getCounter("learner_commit_received_count");
 
+        /**
+         * Learner handler quorum packet metrics.
+         */
+        LEARNER_HANDLER_QP_SIZE = metricsContext.getSummarySet("learner_handler_qp_size", DetailLevel.BASIC);
+        LEARNER_HANDLER_QP_TIME = metricsContext.getSummarySet("learner_handler_qp_time_ms", DetailLevel.ADVANCED);
     }
 
     /**
@@ -287,6 +292,12 @@ public final class ServerMetrics {
     public final Counter RESPONSE_PACKET_CACHE_HITS;
     public final Counter RESPONSE_PACKET_CACHE_MISSING;
 
+    /**
+     * Learner handler quorum packet metrics.
+     */
+    public final SummarySet LEARNER_HANDLER_QP_SIZE;
+    public final SummarySet LEARNER_HANDLER_QP_TIME;
+
     /*
      * Number of requests that are in the session queue.
      */
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 539f579..09e7d5f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -26,8 +26,10 @@ import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
@@ -99,6 +101,14 @@ public class LearnerHandler extends ZooKeeperThread {
         new LinkedBlockingQueue<QuorumPacket>();
 
     /**
+     * Marker packets would be added to quorum packet queue after every
+     * markerPacketInterval packets.
+     * It is ok if packetCounter overflows.
+     */
+    private final int markerPacketInterval = 1000;
+    private AtomicInteger packetCounter = new AtomicInteger();
+
+    /**
      * This class controls the time that the Leader has been
      * waiting for acknowledgement of a proposal from this Learner.
      * If the time is above syncLimit, the connection will be closed.
@@ -155,6 +165,26 @@ public class LearnerHandler extends ZooKeeperThread {
 
     private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
 
+    private static class MarkerQuorumPacket extends QuorumPacket {
+        long time;
+        MarkerQuorumPacket(long time) {
+            this.time = time;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(time);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            MarkerQuorumPacket that = (MarkerQuorumPacket) o;
+            return time == that.time;
+        }
+    };
+
     private BinaryInputArchive ia;
 
     private BinaryOutputArchive oa;
@@ -162,6 +192,14 @@ public class LearnerHandler extends ZooKeeperThread {
     private final BufferedInputStream bufferedInput;
     private BufferedOutputStream bufferedOutput;
 
+    // for test only
+    protected void setOutputArchive(BinaryOutputArchive oa) {
+        this.oa = oa;
+    }
+    protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
+        this.bufferedOutput = bufferedOutput;
+    }
+
     /**
      * Keep track of whether we have started send packets thread
      */
@@ -249,6 +287,16 @@ public class LearnerHandler extends ZooKeeperThread {
                     p = queuedPackets.take();
                 }
 
+                ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), queuedPackets.size());
+
+                if (p instanceof MarkerQuorumPacket) {
+                    MarkerQuorumPacket m = (MarkerQuorumPacket)p;
+                    ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME.add(
+                            Long.toString(this.sid),
+                            (System.nanoTime() - m.time) / 1000000L);
+                    continue;
+                }
+
                 if (p == proposalOfDeath) {
                     // Packet of death!
                     break;
@@ -651,6 +699,14 @@ public class LearnerHandler extends ZooKeeperThread {
     }
 
     /**
+     * Tests need not send marker packets as they are only needed to
+     * log quorum packet delays
+     */
+    protected boolean shouldSendMarkerPacketForLogging() {
+        return true;
+    }
+
+    /**
      * Determine if we need to sync with follower using DIFF/TRUNC/SNAP
      * and setup follower to receive packets from commit processor
      *
@@ -964,6 +1020,11 @@ public class LearnerHandler extends ZooKeeperThread {
 
     void queuePacket(QuorumPacket p) {
         queuedPackets.add(p);
+        // Add a MarkerQuorumPacket at regular intervals.
+        if (shouldSendMarkerPacketForLogging() &&
+                packetCounter.getAndIncrement() % markerPacketInterval == 0) {
+            queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
+        }
     }
 
     static long packetSize(QuorumPacket p) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerMetricsTest.java
new file mode 100644
index 0000000..dc67147
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerMetricsTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Map;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LearnerHandlerMetricsTest {
+    private MockLearnerHandler learnerHandler;
+    private long sid = 5;
+
+    class MockLearnerHandler extends LearnerHandler {
+        MockLearnerHandler(Socket socket, Leader leader) throws IOException {
+            super(socket, null, leader);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        Leader leader = mock(Leader.class);
+        when(leader.getQuorumAuthServer()).thenReturn(null);
+
+        Socket socket = mock(Socket.class);
+        when(socket.getRemoteSocketAddress()).thenReturn(new InetSocketAddress(32));
+
+        //adding 5ms artificial delay when sending each packet
+        BinaryOutputArchive oa = mock(BinaryOutputArchive.class);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Thread.sleep(5);
+                return  null;
+            }
+        }).when(oa).writeRecord(any(QuorumPacket.class), Matchers.anyString());
+
+        learnerHandler = new MockLearnerHandler(socket, leader);
+        learnerHandler.setOutputArchive(oa);
+        learnerHandler.setBufferedOutput(mock(BufferedOutputStream.class));
+        learnerHandler.sid = sid;
+    }
+
+    @Test
+    public void testMetrics() {
+        ServerMetrics.getMetrics().resetAll();
+
+        //adding 1001 packets in the queue, two marker packets will be added since the interval is every 1000 packets
+        for (int i=0; i<1001; i++) {
+            learnerHandler.queuePacket(new QuorumPacket());
+        }
+        learnerHandler.startSendingPackets();
+
+        //make sure we have enough time to send all the packets in the queue
+        try {
+            Thread.sleep(8000);
+        } catch (Exception e) {
+
+        }
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        String sidStr = Long.toString(sid);
+
+        //we record time for each marker packet and we have two marker packets
+        Assert.assertEquals(2L,  values.get("cnt_" + sidStr + "_learner_handler_qp_time_ms"));
+
+        //the second marker has 1000 packets in front of it and each takes 5 ms to send so the time in queue should be
+        //longer than 5*1000
+        Assert.assertThat((long)values.get("max_" + sidStr + "_learner_handler_qp_time_ms"), greaterThan(5000L));
+
+        //we send 1001 packets + 2 marker packets so the queue size is recorded 1003 times
+        Assert.assertEquals(1003L, values.get("cnt_" + sidStr + "_learner_handler_qp_size"));
+
+        //the longest queue size is recorded when we are sending the first packet
+        Assert.assertEquals(1002L, values.get("max_" + sidStr + "_learner_handler_qp_size"));
+
+        //this is when the queue is emptied
+        Assert.assertEquals(0L, values.get("min_" + sidStr + "_learner_handler_qp_size"));
+
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index 1cd33ec..c15a6bf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -61,6 +61,11 @@ public class LearnerHandlerTest extends ZKTestCase {
         protected void startSendingPackets() {
             threadStarted = true;
         }
+
+        @Override
+        protected boolean shouldSendMarkerPacketForLogging() {
+            return false;
+        }
     }
 
     class MockZKDatabase extends ZKDatabase {