You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/15 21:30:00 UTC

[jira] [Commented] (KAFKA-6896) add producer metrics exporting in KafkaStreams.java

    [ https://issues.apache.org/jira/browse/KAFKA-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476497#comment-16476497 ] 

ASF GitHub Bot commented on KAFKA-6896:
---------------------------------------

guozhangwang closed pull request #4998: KAFKA-6896: Add producer metrics exporting in KafkaStreams.java
URL: https://github.com/apache/kafka/pull/4998
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index ddf5b887205..9e9869a7e48 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -32,7 +32,6 @@
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
@@ -70,6 +69,7 @@
     private boolean producerFenced;
     private boolean sentOffsets;
     private long commitCount = 0L;
+    private Map<MetricName, Metric> mockMetrics;
 
     /**
      * Create a mock producer
@@ -99,6 +99,7 @@ public MockProducer(final Cluster cluster,
         this.consumerGroupOffsets = new ArrayList<>();
         this.uncommittedConsumerGroupOffsets = new HashMap<>();
         this.completions = new ArrayDeque<>();
+        this.mockMetrics = new HashMap<>();
     }
 
     /**
@@ -293,7 +294,14 @@ public synchronized void flush() {
     }
 
     public Map<MetricName, Metric> metrics() {
-        return Collections.emptyMap();
+        return mockMetrics;
+    }
+
+    /**
+     * Set a mock metric for testing purpose
+     */
+    public void setMockMetrics(MetricName name, Metric metric) {
+        mockMetrics.put(name, metric);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d6561818e24..77a68c17803 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -383,10 +383,11 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState
      *
      * @return Map of all metrics.
      */
-    // TODO: we can add metrics for producer and admin client as well
+    // TODO: we can add metrics for admin client as well
     public Map<MetricName, ? extends Metric> metrics() {
         final Map<MetricName, Metric> result = new LinkedHashMap<>();
         for (final StreamThread thread : threads) {
+            result.putAll(thread.producerMetrics());
             result.putAll(thread.consumerMetrics());
         }
         if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b9753249eae..633e7ad295b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -742,4 +742,8 @@ boolean commitNeeded() {
     RecordCollector recordCollector() {
         return recordCollector;
     }
+
+    Producer<byte[], byte[]> getProducer() {
+        return producer;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 32993f6b0be..ddefd9ce0a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -569,6 +569,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
 
     // package-private for testing
     final ConsumerRebalanceListener rebalanceListener;
+    final Producer<byte[], byte[]> producer;
     final Consumer<byte[], byte[]> restoreConsumer;
     final Consumer<byte[], byte[]> consumer;
     final InternalTopologyBuilder builder;
@@ -658,6 +659,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
         return new StreamThread(
             time,
             config,
+            threadProducer,
             restoreConsumer,
             consumer,
             originalReset,
@@ -670,6 +672,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
 
     public StreamThread(final Time time,
                         final StreamsConfig config,
+                        final Producer<byte[], byte[]> producer,
                         final Consumer<byte[], byte[]> restoreConsumer,
                         final Consumer<byte[], byte[]> consumer,
                         final String originalReset,
@@ -690,6 +693,7 @@ public StreamThread(final Time time,
         this.log = logContext.logger(StreamThread.class);
         this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
         this.taskManager = taskManager;
+        this.producer = producer;
         this.restoreConsumer = restoreConsumer;
         this.consumer = consumer;
         this.originalReset = originalReset;
@@ -1217,6 +1221,25 @@ TaskManager taskManager() {
         return standbyRecords;
     }
 
+    public Map<MetricName, Metric> producerMetrics() {
+        final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
+        if (producer != null) {
+            final Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
+            if (producerMetrics != null) {
+                result.putAll(producerMetrics);
+            }
+        } else {
+            // When EOS is turned on, each task will has its own producer client
+            // and the producer object passed in here will be null. We would then iterate through
+            // all the active tasks and add their metrics to the output metrics map.
+            for (StreamTask task: taskManager.activeTasks().values()) {
+                final Map<MetricName, ? extends Metric> taskProducerMetrics = task.getProducer().metrics();
+                result.putAll(taskProducerMetrics);
+            }
+        }
+        return result;
+    }
+
     public Map<MetricName, Metric> consumerMetrics() {
         final Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
         final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5bc1934e151..749d618bac5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -29,7 +29,11 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -286,6 +290,7 @@ public void shouldNotCommitBeforeTheCommitInterval() {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -318,6 +323,7 @@ public void shouldNotCauseExceptionIfNothingCommitted() {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -350,6 +356,7 @@ public void shouldCommitAfterTheCommitInterval() {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -496,6 +503,7 @@ public void shouldShutdownTaskManagerOnClose() {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -531,6 +539,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -557,6 +566,7 @@ public void shouldOnlyShutdownOnce() {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -1242,6 +1252,40 @@ private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata met
         assertTrue(metadata.standbyTasks().isEmpty());
     }
 
+    @Test
+    // TODO: Need to add a test case covering EOS when we create a mock taskManager class
+    public void producerMetricsVerificationWithoutEOS() {
+        final MockProducer<byte[], byte[]> producer = new MockProducer();
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                producer,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
+        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
+        final Metric testMetric = new KafkaMetric(
+                new Object(),
+                testMetricName,
+                new Measurable() {
+                    @Override
+                    public double measure(MetricConfig config, long now) {
+                        return 0;
+                    }
+                },
+                null,
+                new MockTime());
+        producer.setMockMetrics(testMetricName, testMetric);
+        Map<MetricName, Metric> producerMetrics = thread.producerMetrics();
+        assertEquals(testMetricName, producerMetrics.get(testMetricName).metricName());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> add producer metrics exporting in KafkaStreams.java
> ---------------------------------------------------
>
>                 Key: KAFKA-6896
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6896
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>             Fix For: 2.0.0
>
>
> We would like to also export the producer metrics from {{StreamThread}} just like consumer metrics, so that we could gain more visibility of stream application. The approach is to pass in the \{{threadProducer}}into the StreamThread so that we could export its metrics in dynamic.
> Note that this is a pure internal change that doesn't require a KIP, and in the future we also want to export admin client metrics. A followup KIP for admin client will be created once this is merged.
> Pull request here: https://github.com/apache/kafka/pull/4998



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)