You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/05/11 23:59:04 UTC
[kafka] branch 2.5 updated: KAFKA-9921: explicit handling of null
values with retainDuplicates (#8626)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 3988e76 KAFKA-9921: explicit handling of null values with retainDuplicates (#8626)
3988e76 is described below
commit 3988e76936e93e306de53645d38b33fee81e0e09
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon May 11 16:35:02 2020 -0700
KAFKA-9921: explicit handling of null values with retainDuplicates (#8626)
Reviewer: Matthias J. Sax <ma...@confluent.io>
---
.../java/org/apache/kafka/streams/state/Stores.java | 12 ++++++++----
.../org/apache/kafka/streams/state/WindowStore.java | 8 ++++++--
.../streams/state/internals/InMemoryWindowStore.java | 11 +++++------
.../streams/state/internals/RocksDBWindowStore.java | 8 +++++---
.../internals/TimestampedWindowStoreBuilderTest.java | 20 ++++++++++++++++++++
5 files changed, 44 insertions(+), 15 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index b884f6d..066d499 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -187,7 +187,8 @@ public final class Stores {
* is not stored with the records, so this value is used to compute the keys that
* the store returns. No effort is made to validate this parameter, so you must be
* careful to set it the same as the windowed keys you're actually storing.
- * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
+ * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
+ * caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@@ -226,7 +227,8 @@ public final class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
+ * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
+ * caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
@@ -251,7 +253,8 @@ public final class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
+ * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
+ * caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
@@ -321,7 +324,8 @@ public final class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
+ * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
+ * caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index f5e69bf..f8c93e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -38,13 +38,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
/**
* Use the current record timestamp as the {@code windowStartTimestamp} and
* delegate to {@link WindowStore#put(Object, Object, long)}.
- *
+ * <p>
* It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp
* is unlikely to be the correct windowStartTimestamp in general.
*
* @param key The key to associate the value to
* @param value The value to update, it can be null;
- * if the serialized bytes are also null it is interpreted as deletes
+ * if the serialized bytes are also null it is interpreted as delete
* @throws NullPointerException if the given key is {@code null}
*
* @deprecated as timestamp is not provided for the key-value pair, this causes inconsistency
@@ -57,6 +57,10 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
/**
* Put a key-value pair into the window with given window start timestamp
+ * <p>
+ * If serialized value bytes are null it is interpreted as delete. Note that deletes will be
+ * ignored in the case of an underlying store that retains duplicates.
+ *
* @param key The key to associate the value to
* @param value The value; can be null
* @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index d9547c7..c396a68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -117,21 +117,21 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
removeExpiredSegments();
- maybeUpdateSeqnumForDups();
observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
- final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
-
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
expiredRecordSensor.record();
LOG.warn("Skipping record for expired segment.");
} else {
if (value != null) {
+ maybeUpdateSeqnumForDups();
+ final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
segmentMap.get(windowStartTimestamp).put(keyBytes, value);
- } else {
+ } else if (!retainDuplicates) {
+ // Skip if value is null and duplicates are allowed since this delete is a no-op
segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
- kvMap.remove(keyBytes);
+ kvMap.remove(key);
if (kvMap.isEmpty()) {
segmentMap.remove(windowStartTimestamp);
}
@@ -288,7 +288,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final byte[] bytes = new byte[keyBytes.get().length - SEQNUM_SIZE];
System.arraycopy(keyBytes.get(), 0, bytes, 0, bytes.length);
return Bytes.wrap(bytes);
-
}
private WrappedInMemoryWindowStoreIterator registerNewWindowStoreIterator(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index c45fe53..010c092 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -56,9 +56,11 @@ public class RocksDBWindowStore
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
- maybeUpdateSeqnumForDups();
-
- wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum), value);
+ // Skip if value is null and duplicates are allowed since this delete is a no-op
+ if (!(value == null && retainDuplicates)) {
+ maybeUpdateSeqnumForDups();
+ wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum), value);
+ }
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index e6d3da5..a571344 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -17,9 +17,12 @@
package org.apache.kafka.streams.state.internals;
+import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.easymock.EasyMockRunner;
@@ -37,6 +40,7 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
@RunWith(EasyMockRunner.class)
public class TimestampedWindowStoreBuilderTest {
@@ -158,6 +162,22 @@ public class TimestampedWindowStoreBuilderTest {
assertThat(((WrappedStateStore) store).wrapped(), instanceOf(WindowToTimestampedWindowByteStoreAdapter.class));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldDisableCachingWithRetainDuplicates() {
+ supplier = Stores.persistentTimestampedWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true);
+ final StoreBuilder<TimestampedWindowStore<String, String>> builder = new TimestampedWindowStoreBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ ).withCachingEnabled();
+
+ builder.build();
+
+ assertFalse(((AbstractStoreBuilder<String, String, TimestampedWindowStore<String, String>>) builder).enableCaching);
+ }
+
@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {