You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/22 07:15:06 UTC
[kafka] branch 2.1 updated: KAFKA-8142: Fix NPE for nulls in
Headers (#6484)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 2e3cfb1 KAFKA-8142: Fix NPE for nulls in Headers (#6484)
2e3cfb1 is described below
commit 2e3cfb1b6bf2253c8ca1f0cca33ddd50df327a61
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Mar 21 23:53:56 2019 -0700
KAFKA-8142: Fix NPE for nulls in Headers (#6484)
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../common/header/internals/RecordHeaders.java | 12 +--
.../common/header/internals/RecordHeadersTest.java | 21 +++--
.../internals/ProcessorRecordContext.java | 5 +-
.../internals/ProcessorRecordContextTest.java | 98 ++++++++++++++++++++++
4 files changed, 122 insertions(+), 14 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index 577e758..5801bed 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -16,16 +16,17 @@
*/
package org.apache.kafka.common.header.internals;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.AbstractIterator;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.utils.AbstractIterator;
+import java.util.Objects;
public class RecordHeaders implements Headers {
@@ -61,6 +62,7 @@ public class RecordHeaders implements Headers {
@Override
public Headers add(Header header) throws IllegalStateException {
+ Objects.requireNonNull(header, "Header cannot be null.");
canWrite();
headers.add(header);
return this;
diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index 39c1c9c..5b9f95e 100644
--- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -16,19 +16,19 @@
*/
package org.apache.kafka.common.header.internals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class RecordHeadersTest {
@@ -206,6 +206,11 @@ public class RecordHeadersTest {
assertEquals(2, getCount(newHeaders));
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNpeWhenAddingNullHeader() {
+ new RecordHeaders().add(null);
+ }
+
private int getCount(Headers headers) {
int count = 0;
Iterator<Header> headerIterator = headers.iterator();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index cd4657b..da44e96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -90,7 +90,10 @@ public class ProcessorRecordContext implements RecordContext {
if (headers != null) {
for (final Header header : headers) {
size += header.key().toCharArray().length;
- size += header.value().length;
+ final byte[] value = header.value();
+ if (value != null) {
+ size += value.length;
+ }
}
}
return size;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
new file mode 100644
index 0000000..1ea646f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProcessorRecordContextTest {
+ // timestamp + offset + partition: 8 + 8 + 4
+ private final static long MIN_SIZE = 20L;
+
+ @Test
+ public void shouldEstimateNullTopicAndNullHeadersAsZeroLength() {
+ final Headers headers = new RecordHeaders();
+ final ProcessorRecordContext context = new ProcessorRecordContext(
+ 42L,
+ 73L,
+ 0,
+ null,
+ null
+ );
+
+ assertEquals(MIN_SIZE, context.sizeBytes());
+ }
+
+ @Test
+ public void shouldEstimateEmptyHeaderAsZeroLength() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(
+ 42L,
+ 73L,
+ 0,
+ null,
+ new RecordHeaders()
+ );
+
+ assertEquals(MIN_SIZE, context.sizeBytes());
+ }
+
+ @Test
+ public void shouldEstimateTopicLength() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(
+ 42L,
+ 73L,
+ 0,
+ "topic",
+ null
+ );
+
+ assertEquals(MIN_SIZE + 5L, context.sizeBytes());
+ }
+
+ @Test
+ public void shouldEstimateHeadersLength() {
+ final Headers headers = new RecordHeaders();
+ headers.add("header-key", "header-value".getBytes());
+ final ProcessorRecordContext context = new ProcessorRecordContext(
+ 42L,
+ 73L,
+ 0,
+ null,
+ headers
+ );
+
+ assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes());
+ }
+
+ @Test
+ public void shouldEstimateNullValueInHeaderAsZero() {
+ final Headers headers = new RecordHeaders();
+ headers.add("header-key", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(
+ 42L,
+ 73L,
+ 0,
+ null,
+ headers
+ );
+
+ assertEquals(MIN_SIZE + 10L, context.sizeBytes());
+ }
+}