You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ru...@apache.org on 2020/04/10 00:32:50 UTC

[cassandra] 01/06: Add client request size metrics

This is an automated email from the ASF dual-hosted git repository.

rustyrazorblade pushed a commit to branch client-metrics
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 1fea9a507079cde5c9e87f8c70c8f33cb2c7486c
Author: Jon Haddad jon@jonhaddad.com <jo...@jonhaddad.com>
AuthorDate: Tue Apr 7 09:57:31 2020 -0700

    Add client request size metrics
    
    Patch by Jon Haddad for CASSANDRA-15704
---
 .../metrics/ClientRequestSizeMetrics.java          |  36 +++++++
 .../transport/ClientRequestSizeMetricsHandler.java |  58 +++++++++++
 .../org/apache/cassandra/transport/Server.java     |   5 +
 .../ClientRequestSizeMetricsHandlerTest.java       | 112 +++++++++++++++++++++
 4 files changed, 211 insertions(+)

diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
new file mode 100644
index 0000000..cf26e55
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
@@ -0,0 +1,36 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.cassandra.metrics;
+
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+
+/**
+ * Metrics to track the size of incoming and outgoing bytes at Cassandra server.
+ */
+public class ClientRequestSizeMetrics
+{
+    private static final String TYPE = "ClientRequestSize";
+    public static final Counter totalBytesRead = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "IncomingBytes", null));
+    public static final Counter totalBytesWritten = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "OutgoingBytes", null));
+    public static final Histogram bytesReadPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReadPerQuery", null), true);
+    public static final Histogram bytesWrittenPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesWrittenPerQuery", null), true);
+}
diff --git a/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java
new file mode 100644
index 0000000..33fe034
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java
@@ -0,0 +1,58 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.cassandra.transport;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageCodec;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
+
+/**
+ * Records the number of bytes read off of and written to the network
+ * Normally we only use the MessageToMessageCodec to apply a transformation in the Netty messaging pipeline
+ * Here we need to ensure the ByteBuf sticks around past this Handler, so we need to remember to call retain()
+ */
+@ChannelHandler.Sharable
+public class ClientRequestSizeMetricsHandler extends MessageToMessageCodec<ByteBuf, ByteBuf>
+{
+    @Override
+    public void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results)
+    {
+        final long messageSize = buf.writerIndex() - buf.readerIndex();
+        ClientRequestSizeMetrics.totalBytesRead.inc(messageSize);
+        ClientRequestSizeMetrics.bytesReadPerQueryHistogram.update(messageSize);
+        // the buffer needs to be retained here due to Netty's internal requirements.  Without it the buffer may be freed
+        buf.retain();
+        results.add(buf);
+    }
+
+    @Override
+    public void encode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results)
+    {
+        final long messageSize = buf.writerIndex() - buf.readerIndex();
+        ClientRequestSizeMetrics.totalBytesWritten.inc(messageSize);
+        ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.update(messageSize);
+        // please see the comment above regarding retaining the ByteBuf + Netty
+        buf.retain();
+        results.add(buf);
+    }
+}
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 43b024f..64a110b 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -416,6 +416,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
         private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
+        private static final ClientRequestSizeMetricsHandler clientRequestSizeMetricsHandler = new ClientRequestSizeMetricsHandler();
 
         private final Server server;
 
@@ -452,6 +453,10 @@ public class Server implements CassandraDaemon.Server
 
             //pipeline.addLast("debug", new LoggingHandler());
 
+            // Handler to log size of client requests and responses
+            // we need this to come after the connection limit handler so it can drop connections first
+            pipeline.addLast("requestMetricsHandler", clientRequestSizeMetricsHandler);
+
             pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
             pipeline.addLast("frameEncoder", frameEncoder);
 
diff --git a/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java
new file mode 100644
index 0000000..4d01763
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Ensures we properly account for bytes read from and to clients
+ */
+public class ClientRequestSizeMetricsHandlerTest extends CQLTester
+{
+    private ClientRequestSizeMetricsHandler handler;
+    private ByteBufAllocator alloc;
+    private ByteBuf buf;
+    private List<Object> result;
+    private long totalBytesReadStart;
+    private long totalBytesWrittenStart;
+
+    private long totalBytesReadHistoCount;
+    private long totalBytesWrittenHistoCount;
+
+    @Before
+    public void setUp()
+    {
+        handler = new ClientRequestSizeMetricsHandler();
+        alloc = PooledByteBufAllocator.DEFAULT;
+        buf = alloc.buffer(1024);
+        result = new LinkedList<>();
+        buf.writeInt(1);
+
+        totalBytesReadStart = ClientRequestSizeMetrics.totalBytesRead.getCount();
+        totalBytesWrittenStart = ClientRequestSizeMetrics.totalBytesWritten.getCount();
+
+        totalBytesReadHistoCount = ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount();
+        totalBytesWrittenHistoCount = ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount();
+    }
+
+    @Test
+    public void testReadAndWriteMetricsAreRecordedDuringNativeRequests() throws Throwable
+    {
+        executeNet("SELECT * from system.peers");
+
+        assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isGreaterThan(totalBytesReadStart);
+        assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isGreaterThan(totalBytesWrittenStart);
+        assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isGreaterThan(totalBytesReadStart);
+        assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isGreaterThan(totalBytesWrittenStart);
+    }
+
+    /**
+     * Ensures we work with the right metrics within the ClientRequestSizeMetricsHandler
+     */
+    @Test
+    public void testBytesRead()
+    {
+        int beforeRefCount = buf.refCnt();
+        handler.decode(null, buf, result);
+
+        assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart + Integer.BYTES);
+        assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount + 1);
+
+        // make sure we didn't touch the write metrics
+        assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart);
+        assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount);
+
+        // we should have incremented the reference count (netty ByteBuf requirement)
+        assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1);
+    }
+
+    @Test
+    public void testBytesWritten()
+    {
+        int beforeRefCount = buf.refCnt();
+        handler.encode(null, buf, result);
+
+        assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart + Integer.BYTES);
+        assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount + 1);
+
+        // make sure we didn't touch the read metrics
+        assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart);
+        assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount);
+
+        assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org