You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ru...@apache.org on 2020/04/10 00:32:50 UTC
[cassandra] 01/06: Add client request size metrics
This is an automated email from the ASF dual-hosted git repository.
rustyrazorblade pushed a commit to branch client-metrics
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 1fea9a507079cde5c9e87f8c70c8f33cb2c7486c
Author: Jon Haddad jon@jonhaddad.com <jo...@jonhaddad.com>
AuthorDate: Tue Apr 7 09:57:31 2020 -0700
Add client request size metrics
Patch by Jon Haddad for CASSANDRA-15704
---
.../metrics/ClientRequestSizeMetrics.java | 36 +++++++
.../transport/ClientRequestSizeMetricsHandler.java | 58 +++++++++++
.../org/apache/cassandra/transport/Server.java | 5 +
.../ClientRequestSizeMetricsHandlerTest.java | 112 +++++++++++++++++++++
4 files changed, 211 insertions(+)
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
new file mode 100644
index 0000000..cf26e55
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+
+/**
+ * Metrics to track the size of incoming and outgoing bytes at Cassandra server.
+ */
+public class ClientRequestSizeMetrics
+{
+ private static final String TYPE = "ClientRequestSize";
+ public static final Counter totalBytesRead = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "IncomingBytes", null));
+ public static final Counter totalBytesWritten = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "OutgoingBytes", null));
+ public static final Histogram bytesReadPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReadPerQuery", null), true);
+ public static final Histogram bytesWrittenPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesWrittenPerQuery", null), true);
+}
diff --git a/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java
new file mode 100644
index 0000000..33fe034
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageCodec;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
+
+/**
+ * Records the number of bytes read off of and written to the network
+ * Normally we only use the MessageToMessageCodec to apply a transformation in the Netty messaging pipeline
+ * Here we need to ensure the ByteBuf sticks around past this Handler, so we need to remember to call retain()
+ */
+@ChannelHandler.Sharable
+public class ClientRequestSizeMetricsHandler extends MessageToMessageCodec<ByteBuf, ByteBuf>
+{
+ @Override
+ public void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results)
+ {
+ final long messageSize = buf.writerIndex() - buf.readerIndex();
+ ClientRequestSizeMetrics.totalBytesRead.inc(messageSize);
+ ClientRequestSizeMetrics.bytesReadPerQueryHistogram.update(messageSize);
+ // the buffer needs to be retained here due to Netty's internal requirements. Without it the buffer may be freed
+ buf.retain();
+ results.add(buf);
+ }
+
+ @Override
+ public void encode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results)
+ {
+ final long messageSize = buf.writerIndex() - buf.readerIndex();
+ ClientRequestSizeMetrics.totalBytesWritten.inc(messageSize);
+ ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.update(messageSize);
+ // please see the comment above regarding retaining the ByteBuf + Netty
+ buf.retain();
+ results.add(buf);
+ }
+}
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 43b024f..64a110b 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -416,6 +416,7 @@ public class Server implements CassandraDaemon.Server
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
+ private static final ClientRequestSizeMetricsHandler clientRequestSizeMetricsHandler = new ClientRequestSizeMetricsHandler();
private final Server server;
@@ -452,6 +453,10 @@ public class Server implements CassandraDaemon.Server
//pipeline.addLast("debug", new LoggingHandler());
+ // Handler to log size of client requests and responses
+ // we need this to come after the connection limit handler so it can drop connections first
+ pipeline.addLast("requestMetricsHandler", clientRequestSizeMetricsHandler);
+
pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
pipeline.addLast("frameEncoder", frameEncoder);
diff --git a/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java
new file mode 100644
index 0000000..4d01763
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Ensures we properly account for bytes read from and to clients
+ */
+public class ClientRequestSizeMetricsHandlerTest extends CQLTester
+{
+ private ClientRequestSizeMetricsHandler handler;
+ private ByteBufAllocator alloc;
+ private ByteBuf buf;
+ private List<Object> result;
+ private long totalBytesReadStart;
+ private long totalBytesWrittenStart;
+
+ private long totalBytesReadHistoCount;
+ private long totalBytesWrittenHistoCount;
+
+ @Before
+ public void setUp()
+ {
+ handler = new ClientRequestSizeMetricsHandler();
+ alloc = PooledByteBufAllocator.DEFAULT;
+ buf = alloc.buffer(1024);
+ result = new LinkedList<>();
+ buf.writeInt(1);
+
+ totalBytesReadStart = ClientRequestSizeMetrics.totalBytesRead.getCount();
+ totalBytesWrittenStart = ClientRequestSizeMetrics.totalBytesWritten.getCount();
+
+ totalBytesReadHistoCount = ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount();
+ totalBytesWrittenHistoCount = ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount();
+ }
+
+ @Test
+ public void testReadAndWriteMetricsAreRecordedDuringNativeRequests() throws Throwable
+ {
+ executeNet("SELECT * from system.peers");
+
+ assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isGreaterThan(totalBytesReadStart);
+ assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isGreaterThan(totalBytesWrittenStart);
+ assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isGreaterThan(totalBytesReadStart);
+ assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isGreaterThan(totalBytesWrittenStart);
+ }
+
+ /**
+ * Ensures we work with the right metrics within the ClientRequestSizeMetricsHandler
+ */
+ @Test
+ public void testBytesRead()
+ {
+ int beforeRefCount = buf.refCnt();
+ handler.decode(null, buf, result);
+
+ assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart + Integer.BYTES);
+ assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount + 1);
+
+ // make sure we didn't touch the write metrics
+ assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart);
+ assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount);
+
+ // we should have incremented the reference count (netty ByteBuf requirement)
+ assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1);
+ }
+
+ @Test
+ public void testBytesWritten()
+ {
+ int beforeRefCount = buf.refCnt();
+ handler.encode(null, buf, result);
+
+ assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart + Integer.BYTES);
+ assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount + 1);
+
+ // make sure we didn't touch the read metrics
+ assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart);
+ assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount);
+
+ assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org