You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/02 02:14:04 UTC
[kafka] branch 2.3 updated: KAFKA-8446: Kafka Streams restoration
crashes with NPE when the record value is null (#6842)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 98f5c53 KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842)
98f5c53 is described below
commit 98f5c538a7403dafdbbcb5476907a9859ec3876b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sat Jun 1 19:12:58 2019 -0700
KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842)
When the restored record value is null, we are in danger of NPE during restoration phase.
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/state/internals/RecordConverters.java | 11 +-
.../StateRestorationIntegrationTest.java | 122 +++++++++++++++++++++
.../state/internals/RecordConvertersTest.java | 49 +++++++++
3 files changed, 177 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index f65cc32..8305d52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -27,6 +27,11 @@ public final class RecordConverters {
private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
+ final byte[] recordValue = rawValue == null ? null :
+ ByteBuffer.allocate(8 + rawValue.length)
+ .putLong(timestamp)
+ .put(rawValue)
+ .array();
return new ConsumerRecord<>(
record.topic(),
record.partition(),
@@ -37,11 +42,7 @@ public final class RecordConverters {
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
- ByteBuffer
- .allocate(8 + rawValue.length)
- .putLong(timestamp)
- .put(rawValue)
- .array(),
+ recordValue,
record.headers(),
record.leaderEpoch()
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
new file mode 100644
index 0000000..e22ff4f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+@Category({IntegrationTest.class})
+public class StateRestorationIntegrationTest {
+ private StreamsBuilder builder = new StreamsBuilder();
+
+ private static final String APPLICATION_ID = "restoration-test-app";
+ private static final String STATE_STORE_NAME = "stateStore";
+ private static final String INPUT_TOPIC = "input";
+ private static final String OUTPUT_TOPIC = "output";
+
+ private Properties streamsConfiguration;
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+ private final MockTime mockTime = CLUSTER.time;
+
+ @Before
+ public void setUp() throws Exception {
+ final Properties props = new Properties();
+
+ streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+ APPLICATION_ID,
+ CLUSTER.bootstrapServers(),
+ Serdes.Integer().getClass().getName(),
+ Serdes.ByteArray().getClass().getName(),
+ props);
+
+ CLUSTER.createTopics(INPUT_TOPIC);
+ CLUSTER.createTopics(OUTPUT_TOPIC);
+
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+ @Test
+ public void shouldRestoreNullRecord() throws InterruptedException, ExecutionException {
+ builder.table(INPUT_TOPIC, Materialized.<Integer, Bytes>as(
+ Stores.persistentTimestampedKeyValueStore(STATE_STORE_NAME))
+ .withKeySerde(Serdes.Integer())
+ .withValueSerde(Serdes.Bytes())
+ .withCachingDisabled()).toStream().to(OUTPUT_TOPIC);
+
+ final Properties producerConfig = TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class);
+
+ final List<KeyValue<Integer, Bytes>> initialKeyValues = Arrays.asList(
+ KeyValue.pair(3, new Bytes(new byte[]{3})),
+ KeyValue.pair(3, null),
+ KeyValue.pair(1, new Bytes(new byte[]{1})));
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ INPUT_TOPIC, initialKeyValues, producerConfig, mockTime);
+
+ KafkaStreams streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
+ streams.start();
+
+ final Properties consumerConfig = TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class);
+
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ consumerConfig, OUTPUT_TOPIC, initialKeyValues);
+
+ // wipe out state store to trigger restore process on restart
+ streams.close();
+ streams.cleanUp();
+
+ // Restart the stream instance. There should not be exception handling the null value within changelog topic.
+ final List<KeyValue<Integer, Bytes>> newKeyValues =
+ Collections.singletonList(KeyValue.pair(2, new Bytes(new byte[3])));
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ INPUT_TOPIC, newKeyValues, producerConfig, mockTime);
+ streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
+ streams.start();
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ consumerConfig, OUTPUT_TOPIC, newKeyValues);
+ streams.close();
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
new file mode 100644
index 0000000..bacbacd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+public class RecordConvertersTest {
+
+ private final RecordConverter timestampedValueConverter = rawValueToTimestampedValue();
+
+ @Test
+ public void shouldPreserveNullValueOnConversion() {
+ final ConsumerRecord<byte[], byte[]> nullValueRecord = new ConsumerRecord<>("", 0, 0L, new byte[0], null);
+ assertNull(timestampedValueConverter.convert(nullValueRecord).value());
+ }
+
+ @Test
+ public void shouldAddTimestampToValueOnConversionWhenValueIsNotNull() {
+ final long timestamp = 10L;
+ final byte[] value = new byte[1];
+ final ConsumerRecord<byte[], byte[]> inputRecord = new ConsumerRecord<>(
+ "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], value);
+ final byte[] expectedValue = ByteBuffer.allocate(9).putLong(timestamp).put(value).array();
+ final byte[] actualValue = timestampedValueConverter.convert(inputRecord).value();
+ assertArrayEquals(expectedValue, actualValue);
+ }
+}