You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/28 13:15:23 UTC
[kafka] branch trunk updated: KAFKA-6809: Count inbound connections
in the connection-creation metric (#5301)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new daa8082 KAFKA-6809: Count inbound connections in the connection-creation metric (#5301)
daa8082 is described below
commit daa8082d6718d39584ed6d25863af48acec2cea6
Author: Stanislav Kozlovski <fa...@windowslive.com>
AuthorDate: Thu Jun 28 14:15:15 2018 +0100
KAFKA-6809: Count inbound connections in the connection-creation metric (#5301)
Previously, the connection-creation metric only accounted for opened connections from the broker. This change extends it to account for received connections.
---
.../org/apache/kafka/common/network/Selector.java | 1 +
.../apache/kafka/common/network/SelectorTest.java | 55 ++++++++++++++++++++++
2 files changed, 56 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index a269f0f..8ca7fff 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -265,6 +265,7 @@ public class Selector implements Selectable, AutoCloseable {
public void register(String id, SocketChannel socketChannel) throws IOException {
ensureNotRegistered(id);
registerChannel(id, socketChannel, SelectionKey.OP_READ);
+ this.sensors.connectionCreated.record();
}
private void ensureNotRegistered(String id) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 8ce8d50..3bb6244 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -17,8 +17,10 @@
package org.apache.kafka.common.network;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
@@ -49,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.Optional;
import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.expect;
@@ -580,6 +583,49 @@ public class SelectorTest {
control.verify();
}
+ @Test
+ public void testOutboundConnectionsCountInConnectionCreationMetric() throws Exception {
+ // create connections
+ int expectedConnections = 5;
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ for (int i = 0; i < expectedConnections; i++)
+ connect(Integer.toString(i), addr);
+
+ // Poll continuously, as we cannot guarantee that the first call will see all connections
+ int seenConnections = 0;
+ for (int i = 0; i < 10; i++) {
+ selector.poll(100L);
+ seenConnections += selector.connected().size();
+ if (seenConnections == expectedConnections)
+ break;
+ }
+
+ assertEquals((double) expectedConnections, getMetric("connection-creation-total").metricValue());
+ assertEquals((double) expectedConnections, getMetric("connection-count").metricValue());
+ }
+
+ @Test
+ public void testInboundConnectionsCountInConnectionCreationMetric() throws Exception {
+ int conns = 5;
+
+ try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ ss.bind(new InetSocketAddress(0));
+ InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+ for (int i = 0; i < conns; i++) {
+ Thread sender = createSender(serverAddress, randomPayload(1));
+ sender.start();
+ SocketChannel channel = ss.accept();
+ channel.configureBlocking(false);
+
+ selector.register(Integer.toString(i), channel);
+ }
+ }
+
+ assertEquals((double) conns, getMetric("connection-creation-total").metricValue());
+ assertEquals((double) conns, getMetric("connection-count").metricValue());
+ }
+
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);
@@ -675,4 +721,13 @@ public class SelectorTest {
assertTrue("Field not empty: " + field + " " + obj, ((Map<?, ?>) obj).isEmpty());
}
+ private KafkaMetric getMetric(String name) throws Exception {
+ Optional<Map.Entry<MetricName, KafkaMetric>> metric = metrics.metrics().entrySet().stream()
+ .filter(entry -> entry.getKey().name().equals(name))
+ .findFirst();
+ if (!metric.isPresent())
+ throw new Exception(String.format("Could not find metric called %s", name));
+
+ return metric.get().getValue();
+ }
}