You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/05/11 23:59:04 UTC

[kafka] branch 2.5 updated: KAFKA-9921: explicit handling of null values with retainDuplicates (#8626)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 3988e76  KAFKA-9921: explicit handling of null values with retainDuplicates (#8626)
3988e76 is described below

commit 3988e76936e93e306de53645d38b33fee81e0e09
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon May 11 16:35:02 2020 -0700

    KAFKA-9921: explicit handling of null values with retainDuplicates (#8626)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>
---
 .../java/org/apache/kafka/streams/state/Stores.java  | 12 ++++++++----
 .../org/apache/kafka/streams/state/WindowStore.java  |  8 ++++++--
 .../streams/state/internals/InMemoryWindowStore.java | 11 +++++------
 .../streams/state/internals/RocksDBWindowStore.java  |  8 +++++---
 .../internals/TimestampedWindowStoreBuilderTest.java | 20 ++++++++++++++++++++
 5 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index b884f6d..066d499 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -187,7 +187,8 @@ public final class Stores {
      *                              is not stored with the records, so this value is used to compute the keys that
      *                              the store returns. No effort is made to validate this parameter, so you must be
      *                              careful to set it the same as the windowed keys you're actually storing.
-     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
+     * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
+     *                              caching and means that null values will be ignored.
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
      */
@@ -226,7 +227,8 @@ public final class Stores {
      *                              windowed data's entire life cycle, from window-start through window-end,
      *                              and for the entire grace period)
      * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
+     * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
+     *                              caching and means that null values will be ignored.
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
@@ -251,7 +253,8 @@ public final class Stores {
      *                              windowed data's entire life cycle, from window-start through window-end,
      *                              and for the entire grace period)
      * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
+     * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
+     *                              caching and means that null values will be ignored.
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
@@ -321,7 +324,8 @@ public final class Stores {
      *                              windowed data's entire life cycle, from window-start through window-end,
      *                              and for the entire grace period.
      * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
+     * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
+     *                              caching and means that null values will be ignored.
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index f5e69bf..f8c93e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -38,13 +38,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
     /**
      * Use the current record timestamp as the {@code windowStartTimestamp} and
      * delegate to {@link WindowStore#put(Object, Object, long)}.
-     *
+     * <p>
      * It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp
      * is unlikely to be the correct windowStartTimestamp in general.
      *
      * @param key The key to associate the value to
      * @param value The value to update, it can be null;
-     *              if the serialized bytes are also null it is interpreted as deletes
+     *              if the serialized bytes are also null it is interpreted as delete
      * @throws NullPointerException if the given key is {@code null}
      *
      * @deprecated as timestamp is not provided for the key-value pair, this causes inconsistency
@@ -57,6 +57,10 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
 
     /**
      * Put a key-value pair into the window with given window start timestamp
+     * <p>
+     * If serialized value bytes are null it is interpreted as delete. Note that deletes will be
+     * ignored in the case of an underlying store that retains duplicates.
+     *
      * @param key The key to associate the value to
      * @param value The value; can be null
      * @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index d9547c7..c396a68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -117,21 +117,21 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
     @Override
     public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
         removeExpiredSegments();
-        maybeUpdateSeqnumForDups();
         observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
 
-        final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
-
         if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
             expiredRecordSensor.record();
             LOG.warn("Skipping record for expired segment.");
         } else {
             if (value != null) {
+                maybeUpdateSeqnumForDups();
+                final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
                 segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
                 segmentMap.get(windowStartTimestamp).put(keyBytes, value);
-            } else {
+            } else if (!retainDuplicates) {
+                // Skip if value is null and duplicates are allowed since this delete is a no-op
                 segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
-                    kvMap.remove(keyBytes);
+                    kvMap.remove(key);
                     if (kvMap.isEmpty()) {
                         segmentMap.remove(windowStartTimestamp);
                     }
@@ -288,7 +288,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         final byte[] bytes = new byte[keyBytes.get().length  - SEQNUM_SIZE];
         System.arraycopy(keyBytes.get(), 0, bytes, 0, bytes.length);
         return Bytes.wrap(bytes);
-
     }
 
     private WrappedInMemoryWindowStoreIterator registerNewWindowStoreIterator(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index c45fe53..010c092 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -56,9 +56,11 @@ public class RocksDBWindowStore
 
     @Override
     public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
-        maybeUpdateSeqnumForDups();
-
-        wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum), value);
+        // Skip if value is null and duplicates are allowed since this delete is a no-op
+        if (!(value == null && retainDuplicates)) {
+            maybeUpdateSeqnumForDups();
+            wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum), value);
+        }
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index e6d3da5..a571344 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.easymock.EasyMockRunner;
@@ -37,6 +40,7 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
 
 @RunWith(EasyMockRunner.class)
 public class TimestampedWindowStoreBuilderTest {
@@ -158,6 +162,22 @@ public class TimestampedWindowStoreBuilderTest {
         assertThat(((WrappedStateStore) store).wrapped(), instanceOf(WindowToTimestampedWindowByteStoreAdapter.class));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldDisableCachingWithRetainDuplicates() {
+        supplier = Stores.persistentTimestampedWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true);
+        final StoreBuilder<TimestampedWindowStore<String, String>> builder = new TimestampedWindowStoreBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime()
+        ).withCachingEnabled();
+
+        builder.build();
+
+        assertFalse(((AbstractStoreBuilder<String, String, TimestampedWindowStore<String, String>>) builder).enableCaching);
+    }
+
     @SuppressWarnings("all")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerIfInnerIsNull() {