You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/22 07:15:06 UTC

[kafka] branch 2.1 updated: KAFKA-8142: Fix NPE for nulls in Headers (#6484)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2e3cfb1  KAFKA-8142: Fix NPE for nulls in Headers (#6484)
2e3cfb1 is described below

commit 2e3cfb1b6bf2253c8ca1f0cca33ddd50df327a61
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Mar 21 23:53:56 2019 -0700

    KAFKA-8142: Fix NPE for nulls in Headers (#6484)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../common/header/internals/RecordHeaders.java     | 12 +--
 .../common/header/internals/RecordHeadersTest.java | 21 +++--
 .../internals/ProcessorRecordContext.java          |  5 +-
 .../internals/ProcessorRecordContextTest.java      | 98 ++++++++++++++++++++++
 4 files changed, 122 insertions(+), 14 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index 577e758..5801bed 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -16,16 +16,17 @@
  */
 package org.apache.kafka.common.header.internals;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.AbstractIterator;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.utils.AbstractIterator;
+import java.util.Objects;
 
 public class RecordHeaders implements Headers {
 
@@ -61,6 +62,7 @@ public class RecordHeaders implements Headers {
 
     @Override
     public Headers add(Header header) throws IllegalStateException {
+        Objects.requireNonNull(header, "Header cannot be null.");
         canWrite();
         headers.add(header);
         return this;
diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index 39c1c9c..5b9f95e 100644
--- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -16,19 +16,19 @@
  */
 package org.apache.kafka.common.header.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class RecordHeadersTest {
 
@@ -206,6 +206,11 @@ public class RecordHeadersTest {
         assertEquals(2, getCount(newHeaders));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNpeWhenAddingNullHeader() {
+        new RecordHeaders().add(null);
+    }
+
     private int getCount(Headers headers) {
         int count = 0;
         Iterator<Header> headerIterator = headers.iterator();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index cd4657b..da44e96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -90,7 +90,10 @@ public class ProcessorRecordContext implements RecordContext {
         if (headers != null) {
             for (final Header header : headers) {
                 size += header.key().toCharArray().length;
-                size += header.value().length;
+                final byte[] value = header.value();
+                if (value != null) {
+                    size += value.length;
+                }
             }
         }
         return size;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
new file mode 100644
index 0000000..1ea646f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProcessorRecordContextTest {
+    // timestamp + offset + partition: 8 + 8 + 4
+    private final static long MIN_SIZE = 20L;
+
+    @Test
+    public void shouldEstimateNullTopicAndNullHeadersAsZeroLength() {
+        final Headers headers = new RecordHeaders();
+        final ProcessorRecordContext context = new ProcessorRecordContext(
+            42L,
+            73L,
+            0,
+            null,
+            null
+        );
+
+        assertEquals(MIN_SIZE, context.sizeBytes());
+    }
+
+    @Test
+    public void shouldEstimateEmptyHeaderAsZeroLength() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(
+            42L,
+            73L,
+            0,
+            null,
+            new RecordHeaders()
+        );
+
+        assertEquals(MIN_SIZE, context.sizeBytes());
+    }
+
+    @Test
+    public void shouldEstimateTopicLength() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(
+            42L,
+            73L,
+            0,
+            "topic",
+            null
+        );
+
+        assertEquals(MIN_SIZE + 5L, context.sizeBytes());
+    }
+
+    @Test
+    public void shouldEstimateHeadersLength() {
+        final Headers headers = new RecordHeaders();
+        headers.add("header-key", "header-value".getBytes());
+        final ProcessorRecordContext context = new ProcessorRecordContext(
+            42L,
+            73L,
+            0,
+            null,
+            headers
+        );
+
+        assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes());
+    }
+
+    @Test
+    public void shouldEstimateNullValueInHeaderAsZero() {
+        final Headers headers = new RecordHeaders();
+        headers.add("header-key", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(
+            42L,
+            73L,
+            0,
+            null,
+            headers
+        );
+
+        assertEquals(MIN_SIZE + 10L, context.sizeBytes());
+    }
+}