You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/28 13:15:23 UTC

[kafka] branch trunk updated: KAFKA-6809: Count inbound connections in the connection-creation metric (#5301)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new daa8082  KAFKA-6809: Count inbound connections in the connection-creation metric (#5301)
daa8082 is described below

commit daa8082d6718d39584ed6d25863af48acec2cea6
Author: Stanislav Kozlovski <fa...@windowslive.com>
AuthorDate: Thu Jun 28 14:15:15 2018 +0100

    KAFKA-6809: Count inbound connections in the connection-creation metric (#5301)
    
    Previously, the connection-creation metric only accounted for opened connections from the broker. This change extends it to account for received connections.
---
 .../org/apache/kafka/common/network/Selector.java  |  1 +
 .../apache/kafka/common/network/SelectorTest.java  | 55 ++++++++++++++++++++++
 2 files changed, 56 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index a269f0f..8ca7fff 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -265,6 +265,7 @@ public class Selector implements Selectable, AutoCloseable {
     public void register(String id, SocketChannel socketChannel) throws IOException {
         ensureNotRegistered(id);
         registerChannel(id, socketChannel, SelectionKey.OP_READ);
+        this.sensors.connectionCreated.record();
     }
 
     private void ensureNotRegistered(String id) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 8ce8d50..3bb6244 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
@@ -49,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.Optional;
 
 import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.expect;
@@ -580,6 +583,49 @@ public class SelectorTest {
         control.verify();
     }
 
+    @Test
+    public void testOutboundConnectionsCountInConnectionCreationMetric() throws Exception {
+        // create connections
+        int expectedConnections = 5;
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        for (int i = 0; i < expectedConnections; i++)
+            connect(Integer.toString(i), addr);
+
+        // Poll continuously, as we cannot guarantee that the first call will see all connections
+        int seenConnections = 0;
+        for (int i = 0; i < 10; i++) {
+            selector.poll(100L);
+            seenConnections += selector.connected().size();
+            if (seenConnections == expectedConnections)
+                break;
+        }
+
+        assertEquals((double) expectedConnections, getMetric("connection-creation-total").metricValue());
+        assertEquals((double) expectedConnections, getMetric("connection-count").metricValue());
+    }
+
+    @Test
+    public void testInboundConnectionsCountInConnectionCreationMetric() throws Exception {
+        int conns = 5;
+
+        try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+            ss.bind(new InetSocketAddress(0));
+            InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+            for (int i = 0; i < conns; i++) {
+                Thread sender = createSender(serverAddress, randomPayload(1));
+                sender.start();
+                SocketChannel channel = ss.accept();
+                channel.configureBlocking(false);
+
+                selector.register(Integer.toString(i), channel);
+            }
+        }
+
+        assertEquals((double) conns, getMetric("connection-creation-total").metricValue());
+        assertEquals((double) conns, getMetric("connection-count").metricValue());
+    }
+
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);
@@ -675,4 +721,13 @@ public class SelectorTest {
             assertTrue("Field not empty: " + field + " " + obj, ((Map<?, ?>) obj).isEmpty());
     }
 
+    private KafkaMetric getMetric(String name) throws Exception {
+        Optional<Map.Entry<MetricName, KafkaMetric>> metric = metrics.metrics().entrySet().stream()
+                .filter(entry -> entry.getKey().name().equals(name))
+                .findFirst();
+        if (!metric.isPresent())
+            throw new Exception(String.format("Could not find metric called %s", name));
+
+        return metric.get().getValue();
+    }
 }