You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/28 18:35:59 UTC

[kafka] branch 2.5 updated: KAFKA-9921: disable caching on stores configured to retain duplicates (#8564)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 91bcc53  KAFKA-9921: disable caching on stores configured to retain duplicates (#8564)
91bcc53 is described below

commit 91bcc53ca737161712ce2572646bcfe749ea75fd
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Apr 28 11:33:41 2020 -0700

    KAFKA-9921: disable caching on stores configured to retain duplicates (#8564)
    
    These two options are essentially incompatible, as caching will do nothing to reduce downstream traffic and writes when it has to allow non-unique keys (skipping records where the value is also the same is a separate issue, see KIP-557). But enabling caching on a store that's configured to retain duplicates is actually more than just ineffective, and currently causes incorrect results.
    
    We should just log a warning and disable caching whenever a store is retaining duplicates to avoid introducing a regression. Maybe when 3.0 comes around we should consider throwing an exception instead to alert the user more aggressively.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>
---
 .../java/org/apache/kafka/streams/state/Stores.java  |  7 ++++---
 .../internals/TimestampedWindowStoreBuilder.java     |  8 ++++++++
 .../streams/state/internals/WindowStoreBuilder.java  |  8 ++++++++
 .../state/internals/WindowStoreBuilderTest.java      | 20 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 91e767b..b884f6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -187,7 +187,7 @@ public final class Stores {
      *                              is not stored with the records, so this value is used to compute the keys that
      *                              the store returns. No effort is made to validate this parameter, so you must be
      *                              careful to set it the same as the windowed keys you're actually storing.
-     * @param retainDuplicates      whether or not to retain duplicates.
+     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
      */
@@ -226,7 +226,7 @@ public final class Stores {
      *                              windowed data's entire life cycle, from window-start through window-end,
      *                              and for the entire grace period)
      * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates.
+     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
@@ -251,7 +251,7 @@ public final class Stores {
      *                              windowed data's entire life cycle, from window-start through window-end,
      *                              and for the entire grace period)
      * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates.
+     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
@@ -321,6 +321,7 @@ public final class Stores {
      *                              windowed data's entire life cycle, from window-start through window-end,
      *                              and for the entire grace period.
      * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates. turning this on will automatically disable caching
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index d545975..4318939 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -31,9 +31,12 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TimestampedWindowStoreBuilder<K, V>
     extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, TimestampedWindowStore<K, V>> {
+    private final Logger log = LoggerFactory.getLogger(TimestampedWindowStoreBuilder.class);
 
     private final WindowBytesStoreSupplier storeSupplier;
 
@@ -56,6 +59,11 @@ public class TimestampedWindowStoreBuilder<K, V>
                 store = new InMemoryTimestampedWindowStoreMarker(store);
             }
         }
+        if (storeSupplier.retainDuplicates() && enableCaching) {
+            log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name());
+            enableCaching = false;
+        }
+
         return new MeteredTimestampedWindowStore<>(
             maybeWrapCaching(maybeWrapLogging(store)),
             storeSupplier.windowSize(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index ea30d69..7225645 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -21,8 +21,11 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
+    private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class);
 
     private final WindowBytesStoreSupplier storeSupplier;
 
@@ -36,6 +39,11 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS
 
     @Override
     public WindowStore<K, V> build() {
+        if (storeSupplier.retainDuplicates() && enableCaching) {
+            log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name());
+            enableCaching = false;
+        }
+
         return new MeteredWindowStore<>(
             maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
             storeSupplier.windowSize(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
index bf29d4a..ed43c4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -17,10 +17,13 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.easymock.EasyMockRunner;
@@ -37,6 +40,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
 
 @RunWith(EasyMockRunner.class)
 public class WindowStoreBuilderTest {
@@ -113,6 +117,22 @@ public class WindowStoreBuilderTest {
         assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldDisableCachingWithRetainDuplicates() {
+        supplier = Stores.persistentWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true);
+        final StoreBuilder<WindowStore<String, String>> builder = new WindowStoreBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime()
+        ).withCachingEnabled();
+        
+        builder.build();
+
+        assertFalse(((AbstractStoreBuilder<String, String, WindowStore<String, String>>) builder).enableCaching);
+    }
+
     @SuppressWarnings("all")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerIfInnerIsNull() {