You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/06/05 20:55:03 UTC

[kafka] 01/02: KAFKA-10110: Corrected potential NPE when null label value added to KafkaMetricsContext (#8811)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c5333d52019f06991e2abec5b50094f3633fb2e2
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Jun 5 15:19:23 2020 -0500

    KAFKA-10110: Corrected potential NPE when null label value added to KafkaMetricsContext (#8811)
    
    Also added a new unit test to verify the functionality and expectations.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/common/metrics/KafkaMetricsContext.java  |  2 +-
 .../common/metrics/KafkaMetricsContextTest.java    | 89 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
index 2fe73de..43eb8cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
@@ -45,7 +45,7 @@ public class KafkaMetricsContext implements MetricsContext {
      */
     public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) {
         this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
-        contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString()));
+        contextLabels.forEach((key, value) -> this.contextLabels.put(key, value != null ? value.toString() : null));
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricsContextTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricsContextTest.java
new file mode 100644
index 0000000..55b24f8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricsContextTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class KafkaMetricsContextTest {
+
+    private static final String SAMPLE_NAMESPACE = "sample-ns";
+
+    private static final String LABEL_A_KEY = "label-a";
+    private static final String LABEL_A_VALUE = "label-a-value";
+
+    private String namespace;
+    private Map<String, String> labels;
+    private KafkaMetricsContext context;
+
+    @Before
+    public void beforeEach() {
+        namespace = SAMPLE_NAMESPACE;
+        labels = new HashMap<>();
+        labels.put(LABEL_A_KEY, LABEL_A_VALUE);
+    }
+
+    @Test
+    public void testCreationWithValidNamespaceAndNoLabels() {
+        labels.clear();
+        context = new KafkaMetricsContext(namespace, labels);
+
+        assertEquals(1, context.contextLabels().size());
+        assertEquals(namespace, context.contextLabels().get(MetricsContext.NAMESPACE));
+    }
+
+    @Test
+    public void testCreationWithValidNamespaceAndLabels() {
+        context = new KafkaMetricsContext(namespace, labels);
+
+        assertEquals(2, context.contextLabels().size());
+        assertEquals(namespace, context.contextLabels().get(MetricsContext.NAMESPACE));
+        assertEquals(LABEL_A_VALUE, context.contextLabels().get(LABEL_A_KEY));
+    }
+
+    @Test
+    public void testCreationWithValidNamespaceAndNullLabelValues() {
+        labels.put(LABEL_A_KEY, null);
+        context = new KafkaMetricsContext(namespace, labels);
+
+        assertEquals(2, context.contextLabels().size());
+        assertEquals(namespace, context.contextLabels().get(MetricsContext.NAMESPACE));
+        assertNull(context.contextLabels().get(LABEL_A_KEY));
+    }
+
+    @Test
+    public void testCreationWithNullNamespaceAndLabels() {
+        context = new KafkaMetricsContext(null, labels);
+
+        assertEquals(2, context.contextLabels().size());
+        assertNull(context.contextLabels().get(MetricsContext.NAMESPACE));
+        assertEquals(LABEL_A_VALUE, context.contextLabels().get(LABEL_A_KEY));
+    }
+
+    @Test
+    public void testKafkaMetricsContextLabelsAreImmutable() {
+        context = new KafkaMetricsContext(namespace, labels);
+        assertThrows(UnsupportedOperationException.class, () -> context.contextLabels().clear());
+    }
+}
\ No newline at end of file