You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/08/23 22:00:20 UTC

[kafka] branch 3.0 updated: HOTFIX: Fix null pointer when getting metric value in MetricsReporter (#11248)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7cc2b99  HOTFIX: Fix null pointer when getting metric value in MetricsReporter (#11248)
7cc2b99 is described below

commit 7cc2b9925bdf1563a32a9347f797bebc8fc3839a
Author: Phil Hardwick <ph...@gmail.com>
AuthorDate: Mon Aug 23 21:21:38 2021 +0100

    HOTFIX: Fix null pointer when getting metric value in MetricsReporter (#11248)
    
    The alive stream threads metric relies on the threads field as a monitor object for
    its synchronized block. When the alive stream threads metric is registered it isn't
    initialised so any call to get the metric value before it is initialised will result
    in a null pointer exception.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Walker Carlson <wc...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   2 +-
 .../MetricsReporterIntegrationTest.java            | 135 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b508396..c53c532 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -861,6 +861,7 @@ public class KafkaStreams implements AutoCloseable {
         ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
+        threads = Collections.synchronizedList(new LinkedList<>());
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads());
 
         streamsMetadataState = new StreamsMetadataState(
@@ -893,7 +894,6 @@ public class KafkaStreams implements AutoCloseable {
             globalThreadState = globalStreamThread.state();
         }
 
-        threads = Collections.synchronizedList(new LinkedList<>());
         threadState = new HashMap<>(numStreamThreads);
         streamStateListener = new StreamStateListener(threadState, globalThreadState);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
new file mode 100644
index 0000000..a7c925a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category({IntegrationTest.class})
+public class MetricsReporterIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    // topic names
+    private static final String STREAM_INPUT = "STREAM_INPUT";
+    private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
+
+    private StreamsBuilder builder;
+    private Properties streamsConfiguration;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws InterruptedException {
+        builder = new StreamsBuilder();
+
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String appId = "app-" + safeTestName;
+
+        streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricReporterImpl.class.getName());
+    }
+
+    final static Map<String, Object> METRIC_NAME_TO_INITIAL_VALUE = new HashMap<>();
+
+    public static class MetricReporterImpl implements MetricsReporter {
+
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+
+        @Override
+        public void init(final List<KafkaMetric> metrics) {
+        }
+
+        @Override
+        public void metricChange(final KafkaMetric metric) {
+            // get value of metric, e.g. if you wanted checking the type of the value
+            METRIC_NAME_TO_INITIAL_VALUE.put(metric.metricName().name(), metric.metricValue());
+        }
+
+        @Override
+        public void metricRemoval(final KafkaMetric metric) {
+        }
+
+        @Override
+        public void close() {
+        }
+
+    }
+
+    @Test
+    public void shouldBeAbleToProvideInitialMetricValueToMetricsReporter() {
+        // no need to create the topics, because we don't start the stream - just need to create the KafkaStreams object
+        // to check all initial values from the metrics are not null
+        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+                .to(STREAM_OUTPUT, Produced.with(Serdes.Integer(), Serdes.String()));
+        final Topology topology = builder.build();
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
+
+        kafkaStreams.metrics().keySet().forEach(metricName -> {
+            final Object initialMetricValue = METRIC_NAME_TO_INITIAL_VALUE.get(metricName.name());
+            assertThat(initialMetricValue, notNullValue());
+        });
+    }
+
+}
\ No newline at end of file