You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/28 18:35:59 UTC
[kafka] branch 2.5 updated: KAFKA-9921: disable caching on stores
configured to retain duplicates (#8564)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 91bcc53 KAFKA-9921: disable caching on stores configured to retain duplicates (#8564)
91bcc53 is described below
commit 91bcc53ca737161712ce2572646bcfe749ea75fd
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Apr 28 11:33:41 2020 -0700
KAFKA-9921: disable caching on stores configured to retain duplicates (#8564)
These two options are essentially incompatible, as caching will do nothing to reduce downstream traffic and writes when it has to allow non-unique keys (skipping records where the value is also the same is a separate issue, see KIP-557). But enabling caching on a store that's configured to retain duplicates is actually more than just ineffective, and currently causes incorrect results.
We should just log a warning and disable caching whenever a store is retaining duplicates to avoid introducing a regression. Maybe when 3.0 comes around we should consider throwing an exception instead to alert the user more aggressively.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>
---
.../java/org/apache/kafka/streams/state/Stores.java | 7 ++++---
.../internals/TimestampedWindowStoreBuilder.java | 8 ++++++++
.../streams/state/internals/WindowStoreBuilder.java | 8 ++++++++
.../state/internals/WindowStoreBuilderTest.java | 20 ++++++++++++++++++++
4 files changed, 40 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 91e767b..b884f6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -187,7 +187,7 @@ public final class Stores {
* is not stored with the records, so this value is used to compute the keys that
* the store returns. No effort is made to validate this parameter, so you must be
* careful to set it the same as the windowed keys you're actually storing.
- * @param retainDuplicates whether or not to retain duplicates.
+ * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@@ -226,7 +226,7 @@ public final class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates.
+ * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
@@ -251,7 +251,7 @@ public final class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates.
+ * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
@@ -321,6 +321,7 @@ public final class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
+ * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index d545975..4318939 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -31,9 +31,12 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TimestampedWindowStoreBuilder<K, V>
extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, TimestampedWindowStore<K, V>> {
+ private final Logger log = LoggerFactory.getLogger(TimestampedWindowStoreBuilder.class);
private final WindowBytesStoreSupplier storeSupplier;
@@ -56,6 +59,11 @@ public class TimestampedWindowStoreBuilder<K, V>
store = new InMemoryTimestampedWindowStoreMarker(store);
}
}
+ if (storeSupplier.retainDuplicates() && enableCaching) {
+ log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name());
+ enableCaching = false;
+ }
+
return new MeteredTimestampedWindowStore<>(
maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.windowSize(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index ea30d69..7225645 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -21,8 +21,11 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
+ private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class);
private final WindowBytesStoreSupplier storeSupplier;
@@ -36,6 +39,11 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS
@Override
public WindowStore<K, V> build() {
+ if (storeSupplier.retainDuplicates() && enableCaching) {
+ log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name());
+ enableCaching = false;
+ }
+
return new MeteredWindowStore<>(
maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.windowSize(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
index bf29d4a..ed43c4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -17,10 +17,13 @@
package org.apache.kafka.streams.state.internals;
+import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.easymock.EasyMockRunner;
@@ -37,6 +40,7 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
@RunWith(EasyMockRunner.class)
public class WindowStoreBuilderTest {
@@ -113,6 +117,22 @@ public class WindowStoreBuilderTest {
assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldDisableCachingWithRetainDuplicates() {
+ supplier = Stores.persistentWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true);
+ final StoreBuilder<WindowStore<String, String>> builder = new WindowStoreBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ ).withCachingEnabled();
+
+ builder.build();
+
+ assertFalse(((AbstractStoreBuilder<String, String, WindowStore<String, String>>) builder).enableCaching);
+ }
+
@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {