You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fa...@apache.org on 2019/05/06 15:49:06 UTC
[zookeeper] branch master updated: ZOOKEEPER-3305: Add Quorum
Packet metrics
This is an automated email from the ASF dual-hosted git repository.
fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3a25fe4 ZOOKEEPER-3305: Add Quorum Packet metrics
3a25fe4 is described below
commit 3a25fe4b6c07937dc6859cafd94f3532b3a0befe
Author: Jie Huang <ji...@fb.com>
AuthorDate: Mon May 6 08:48:57 2019 -0700
ZOOKEEPER-3305: Add Quorum Packet metrics
Author: Jie Huang <ji...@fb.com>
Reviewers: eolivelli@apache.org, fangmin@apache.org
Closes #849 from jhuan31/ZOOKEEPER-3305
---
.../org/apache/zookeeper/server/ServerMetrics.java | 11 ++
.../zookeeper/server/quorum/LearnerHandler.java | 61 +++++++++++
.../server/quorum/LearnerHandlerMetricsTest.java | 114 +++++++++++++++++++++
.../server/quorum/LearnerHandlerTest.java | 5 +
4 files changed, 191 insertions(+)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 6ff2474..37ffee1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -192,6 +192,11 @@ public final class ServerMetrics {
LEARNER_PROPOSAL_RECEIVED_COUNT = metricsContext.getCounter("learner_proposal_received_count");
LEARNER_COMMIT_RECEIVED_COUNT = metricsContext.getCounter("learner_commit_received_count");
+ /**
+ * Learner handler quorum packet metrics.
+ */
+ LEARNER_HANDLER_QP_SIZE = metricsContext.getSummarySet("learner_handler_qp_size", DetailLevel.BASIC);
+ LEARNER_HANDLER_QP_TIME = metricsContext.getSummarySet("learner_handler_qp_time_ms", DetailLevel.ADVANCED);
}
/**
@@ -287,6 +292,12 @@ public final class ServerMetrics {
public final Counter RESPONSE_PACKET_CACHE_HITS;
public final Counter RESPONSE_PACKET_CACHE_MISSING;
+ /**
+ * Learner handler quorum packet metrics.
+ */
+ public final SummarySet LEARNER_HANDLER_QP_SIZE;
+ public final SummarySet LEARNER_HANDLER_QP_TIME;
+
/*
* Number of requests that are in the session queue.
*/
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 539f579..09e7d5f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -26,8 +26,10 @@ import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -99,6 +101,14 @@ public class LearnerHandler extends ZooKeeperThread {
new LinkedBlockingQueue<QuorumPacket>();
/**
+ * Marker packets would be added to quorum packet queue after every
+ * markerPacketInterval packets.
+ * It is ok if packetCounter overflows.
+ */
+ private final int markerPacketInterval = 1000;
+ private AtomicInteger packetCounter = new AtomicInteger();
+
+ /**
* This class controls the time that the Leader has been
* waiting for acknowledgement of a proposal from this Learner.
* If the time is above syncLimit, the connection will be closed.
@@ -155,6 +165,26 @@ public class LearnerHandler extends ZooKeeperThread {
private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
+ private static class MarkerQuorumPacket extends QuorumPacket {
+ long time;
+ MarkerQuorumPacket(long time) {
+ this.time = time;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(time);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MarkerQuorumPacket that = (MarkerQuorumPacket) o;
+ return time == that.time;
+ }
+ };
+
private BinaryInputArchive ia;
private BinaryOutputArchive oa;
@@ -162,6 +192,14 @@ public class LearnerHandler extends ZooKeeperThread {
private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;
+ // for test only
+ protected void setOutputArchive(BinaryOutputArchive oa) {
+ this.oa = oa;
+ }
+ protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
+ this.bufferedOutput = bufferedOutput;
+ }
+
/**
* Keep track of whether we have started send packets thread
*/
@@ -249,6 +287,16 @@ public class LearnerHandler extends ZooKeeperThread {
p = queuedPackets.take();
}
+ ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), queuedPackets.size());
+
+ if (p instanceof MarkerQuorumPacket) {
+ MarkerQuorumPacket m = (MarkerQuorumPacket)p;
+ ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME.add(
+ Long.toString(this.sid),
+ (System.nanoTime() - m.time) / 1000000L);
+ continue;
+ }
+
if (p == proposalOfDeath) {
// Packet of death!
break;
@@ -651,6 +699,14 @@ public class LearnerHandler extends ZooKeeperThread {
}
/**
+ * Tests need not send marker packets as they are only needed to
+ * log quorum packet delays
+ */
+ protected boolean shouldSendMarkerPacketForLogging() {
+ return true;
+ }
+
+ /**
* Determine if we need to sync with follower using DIFF/TRUNC/SNAP
* and setup follower to receive packets from commit processor
*
@@ -964,6 +1020,11 @@ public class LearnerHandler extends ZooKeeperThread {
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
+ // Add a MarkerQuorumPacket at regular intervals.
+ if (shouldSendMarkerPacketForLogging() &&
+ packetCounter.getAndIncrement() % markerPacketInterval == 0) {
+ queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
+ }
}
static long packetSize(QuorumPacket p) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerMetricsTest.java
new file mode 100644
index 0000000..dc67147
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerMetricsTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Map;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LearnerHandlerMetricsTest {
+ private MockLearnerHandler learnerHandler;
+ private long sid = 5;
+
+ class MockLearnerHandler extends LearnerHandler {
+ MockLearnerHandler(Socket socket, Leader leader) throws IOException {
+ super(socket, null, leader);
+ }
+ }
+
+ @Before
+ public void setup() throws IOException {
+ Leader leader = mock(Leader.class);
+ when(leader.getQuorumAuthServer()).thenReturn(null);
+
+ Socket socket = mock(Socket.class);
+ when(socket.getRemoteSocketAddress()).thenReturn(new InetSocketAddress(32));
+
+ //adding 5ms artificial delay when sending each packet
+ BinaryOutputArchive oa = mock(BinaryOutputArchive.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Thread.sleep(5);
+ return null;
+ }
+ }).when(oa).writeRecord(any(QuorumPacket.class), Matchers.anyString());
+
+ learnerHandler = new MockLearnerHandler(socket, leader);
+ learnerHandler.setOutputArchive(oa);
+ learnerHandler.setBufferedOutput(mock(BufferedOutputStream.class));
+ learnerHandler.sid = sid;
+ }
+
+ @Test
+ public void testMetrics() {
+ ServerMetrics.getMetrics().resetAll();
+
+ //adding 1001 packets in the queue, two marker packets will be added since the interval is every 1000 packets
+ for (int i=0; i<1001; i++) {
+ learnerHandler.queuePacket(new QuorumPacket());
+ }
+ learnerHandler.startSendingPackets();
+
+ //make sure we have enough time to send all the packets in the queue
+ try {
+ Thread.sleep(8000);
+ } catch (Exception e) {
+
+ }
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ String sidStr = Long.toString(sid);
+
+ //we record time for each marker packet and we have two marker packets
+ Assert.assertEquals(2L, values.get("cnt_" + sidStr + "_learner_handler_qp_time_ms"));
+
+ //the second marker has 1000 packets in front of it and each takes 5 ms to send so the time in queue should be
+ //longer than 5*1000
+ Assert.assertThat((long)values.get("max_" + sidStr + "_learner_handler_qp_time_ms"), greaterThan(5000L));
+
+ //we send 1001 packets + 2 marker packets so the queue size is recorded 1003 times
+ Assert.assertEquals(1003L, values.get("cnt_" + sidStr + "_learner_handler_qp_size"));
+
+ //the longest queue size is recorded when we are sending the first packet
+ Assert.assertEquals(1002L, values.get("max_" + sidStr + "_learner_handler_qp_size"));
+
+ //this is when the queue is emptied
+ Assert.assertEquals(0L, values.get("min_" + sidStr + "_learner_handler_qp_size"));
+
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index 1cd33ec..c15a6bf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -61,6 +61,11 @@ public class LearnerHandlerTest extends ZKTestCase {
protected void startSendingPackets() {
threadStarted = true;
}
+
+ @Override
+ protected boolean shouldSendMarkerPacketForLogging() {
+ return false;
+ }
}
class MockZKDatabase extends ZKDatabase {