You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/02 02:14:04 UTC

[kafka] branch 2.3 updated: KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 98f5c53  KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842)
98f5c53 is described below

commit 98f5c538a7403dafdbbcb5476907a9859ec3876b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sat Jun 1 19:12:58 2019 -0700

    KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842)
    
    When the restored record value is null, we are in danger of NPE during restoration phase.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../streams/state/internals/RecordConverters.java  |  11 +-
 .../StateRestorationIntegrationTest.java           | 122 +++++++++++++++++++++
 .../state/internals/RecordConvertersTest.java      |  49 +++++++++
 3 files changed, 177 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index f65cc32..8305d52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -27,6 +27,11 @@ public final class RecordConverters {
     private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> {
         final byte[] rawValue = record.value();
         final long timestamp = record.timestamp();
+        final byte[] recordValue = rawValue == null ? null :
+            ByteBuffer.allocate(8 + rawValue.length)
+                .putLong(timestamp)
+                .put(rawValue)
+                .array();
         return new ConsumerRecord<>(
             record.topic(),
             record.partition(),
@@ -37,11 +42,7 @@ public final class RecordConverters {
             record.serializedKeySize(),
             record.serializedValueSize(),
             record.key(),
-            ByteBuffer
-                .allocate(8 + rawValue.length)
-                .putLong(timestamp)
-                .put(rawValue)
-                .array(),
+            recordValue,
             record.headers(),
             record.leaderEpoch()
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
new file mode 100644
index 0000000..e22ff4f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+@Category({IntegrationTest.class})
+public class StateRestorationIntegrationTest {
+    private StreamsBuilder builder = new StreamsBuilder();
+
+    private static final String APPLICATION_ID = "restoration-test-app";
+    private static final String STATE_STORE_NAME = "stateStore";
+    private static final String INPUT_TOPIC = "input";
+    private static final String OUTPUT_TOPIC = "output";
+
+    private Properties streamsConfiguration;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+    private final MockTime mockTime = CLUSTER.time;
+
+    @Before
+    public void setUp() throws Exception {
+        final Properties props = new Properties();
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+                APPLICATION_ID,
+                CLUSTER.bootstrapServers(),
+                Serdes.Integer().getClass().getName(),
+                Serdes.ByteArray().getClass().getName(),
+                props);
+
+        CLUSTER.createTopics(INPUT_TOPIC);
+        CLUSTER.createTopics(OUTPUT_TOPIC);
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldRestoreNullRecord() throws InterruptedException, ExecutionException {
+        builder.table(INPUT_TOPIC, Materialized.<Integer, Bytes>as(
+                Stores.persistentTimestampedKeyValueStore(STATE_STORE_NAME))
+                .withKeySerde(Serdes.Integer())
+                .withValueSerde(Serdes.Bytes())
+                .withCachingDisabled()).toStream().to(OUTPUT_TOPIC);
+
+        final Properties producerConfig = TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class);
+
+        final List<KeyValue<Integer, Bytes>> initialKeyValues = Arrays.asList(
+                KeyValue.pair(3, new Bytes(new byte[]{3})),
+                KeyValue.pair(3, null),
+                KeyValue.pair(1, new Bytes(new byte[]{1})));
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+                INPUT_TOPIC, initialKeyValues, producerConfig, mockTime);
+
+        KafkaStreams streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
+        streams.start();
+
+        final Properties consumerConfig = TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class);
+
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                consumerConfig, OUTPUT_TOPIC, initialKeyValues);
+
+        // wipe out state store to trigger restore process on restart
+        streams.close();
+        streams.cleanUp();
+
+        // Restart the stream instance. There should not be exception handling the null value within changelog topic.
+        final List<KeyValue<Integer, Bytes>> newKeyValues =
+                Collections.singletonList(KeyValue.pair(2, new Bytes(new byte[3])));
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+                INPUT_TOPIC, newKeyValues, producerConfig, mockTime);
+        streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
+        streams.start();
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                consumerConfig, OUTPUT_TOPIC, newKeyValues);
+        streams.close();
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
new file mode 100644
index 0000000..bacbacd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+public class RecordConvertersTest {
+
+    private final RecordConverter timestampedValueConverter = rawValueToTimestampedValue();
+
+    @Test
+    public void shouldPreserveNullValueOnConversion() {
+        final ConsumerRecord<byte[], byte[]> nullValueRecord = new ConsumerRecord<>("", 0, 0L, new byte[0], null);
+        assertNull(timestampedValueConverter.convert(nullValueRecord).value());
+    }
+
+    @Test
+    public void shouldAddTimestampToValueOnConversionWhenValueIsNotNull() {
+        final long timestamp = 10L;
+        final byte[] value = new byte[1];
+        final ConsumerRecord<byte[], byte[]> inputRecord = new ConsumerRecord<>(
+                "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], value);
+        final byte[] expectedValue = ByteBuffer.allocate(9).putLong(timestamp).put(value).array();
+        final byte[] actualValue = timestampedValueConverter.convert(inputRecord).value();
+        assertArrayEquals(expectedValue, actualValue);
+    }
+}