You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/29 20:27:32 UTC

[kafka] branch trunk updated: KAFKA-6986: Export Admin Client metrics through Stream Threads (#5210)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d44d5d7  KAFKA-6986: Export Admin Client metrics through Stream Threads (#5210)
d44d5d7 is described below

commit d44d5d7520d31f33feb03b0679f8c552254dac4e
Author: Yishun Guan <gy...@gmail.com>
AuthorDate: Fri Jun 29 13:27:26 2018 -0700

    KAFKA-6986: Export Admin Client metrics through Stream Threads (#5210)
    
    KAFKA-6986:Export Admin Client metrics through Stream Threads
    
    We already exported producer and consumer metrics through KafkaStreams class:
    
    #4998
    
    It makes sense to also export the Admin client metrics.
    
    I didn't add a separate unittest case for this. Let me know if it's needed.
    
    This is my first contribution, feel free to point out any mistakes that I did.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/clients/admin/AdminClient.java    |  9 ++++
 .../kafka/clients/admin/KafkaAdminClient.java      |  7 +++
 .../kafka/clients/admin/MockAdminClient.java       | 13 ++++++
 .../org/apache/kafka/streams/KafkaStreams.java     |  2 +-
 .../streams/processor/internals/StreamThread.java  |  7 +++
 .../streams/processor/internals/TaskManager.java   |  4 ++
 .../processor/internals/StreamThreadTest.java      | 51 ++++++++++++++++++++++
 7 files changed, 92 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 0171b61..75c93b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
@@ -768,4 +770,11 @@ public abstract class AdminClient implements AutoCloseable {
     public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
         return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
     }
+
+    /**
+     * Get the metrics kept by the adminClient
+     *
+     * @return
+     */
+    public abstract Map<MetricName, ? extends Metric> metrics();
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7e245d1..8ddb0c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -2749,4 +2751,9 @@ public class KafkaAdminClient extends AdminClient {
 
         return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures));
     }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 5175072..b5131ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
@@ -47,6 +49,8 @@ public class MockAdminClient extends AdminClient {
     private Node controller;
     private int timeoutNextRequests = 0;
 
+    private Map<MetricName, Metric> mockMetrics = new HashMap<>();
+
     /**
      * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from
      * DEFAULT_CLUSTER_ID.
@@ -390,4 +394,13 @@ public class MockAdminClient extends AdminClient {
             this.configs = configs != null ? configs : Collections.<String, String>emptyMap();
         }
     }
+
+    public void setMockMetrics(MetricName name, Metric metric) {
+        mockMetrics.put(name, metric);
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return mockMetrics;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6a707ff..cef8116 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -382,12 +382,12 @@ public class KafkaStreams {
      *
      * @return Map of all metrics.
      */
-    // TODO: we can add metrics for admin client as well
     public Map<MetricName, ? extends Metric> metrics() {
         final Map<MetricName, Metric> result = new LinkedHashMap<>();
         for (final StreamThread thread : threads) {
             result.putAll(thread.producerMetrics());
             result.putAll(thread.consumerMetrics());
+            result.putAll(thread.adminClientMetrics());
         }
         if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
         result.putAll(metrics.metrics());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 77538ae..74ec9aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1276,4 +1276,11 @@ public class StreamThread extends Thread {
         result.putAll(restoreConsumerMetrics);
         return result;
     }
+
+    public Map<MetricName, Metric> adminClientMetrics() {
+        final Map<MetricName, ? extends Metric> adminClientMetrics = taskManager.getAdminClient().metrics();
+        final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
+        result.putAll(adminClientMetrics);
+        return result;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 44db70d..9da2702 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -278,6 +278,10 @@ public class TaskManager {
         }
     }
 
+    AdminClient getAdminClient() {
+        return adminClient;
+    }
+
     Set<TaskId> suspendedActiveTaskIds() {
         return active.previousTaskIds();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 513d1c0..2ccc893 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -72,6 +73,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1303,4 +1305,53 @@ public class StreamThreadTest {
         Map<MetricName, Metric> producerMetrics = thread.producerMetrics();
         assertEquals(testMetricName, producerMetrics.get(testMetricName).metricName());
     }
+
+    @Test
+    public void adminClientMetricsVerification() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+
+        final MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null);
+
+        final MockProducer<byte[], byte[]> producer = new MockProducer<>();
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                producer,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""),
+                new AtomicBoolean());
+        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
+        final Metric testMetric = new KafkaMetric(
+                new Object(),
+                testMetricName,
+                new Measurable() {
+                    @Override
+                    public double measure(MetricConfig config, long now) {
+                        return 0;
+                    }
+                },
+                null,
+                new MockTime());
+
+
+        EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager, consumer);
+
+        adminClient.setMockMetrics(testMetricName, testMetric);
+        Map<MetricName, Metric> adminClientMetrics = thread.adminClientMetrics();
+        assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName());
+    }
 }