You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/29 10:13:19 UTC
kafka git commit: KAFKA-5932;
Avoid call to fetchPrevious in FlushListeners
Repository: kafka
Updated Branches:
refs/heads/trunk 082def05c -> 36556b804
KAFKA-5932; Avoid call to fetchPrevious in FlushListeners
Author: Bill Bejeck <bi...@confluent.io>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #3978 from bbejeck/KAFKA-5932_no_fetch_previous_when_no_old_values_returned
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36556b80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36556b80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36556b80
Branch: refs/heads/trunk
Commit: 36556b8041d3647375380e6fd70b8f37ba572ddc
Parents: 082def0
Author: Bill Bejeck <bi...@confluent.io>
Authored: Fri Sep 29 11:11:12 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Sep 29 11:11:12 2017 +0100
----------------------------------------------------------------------
.../kstream/internals/TupleForwarder.java | 2 +-
.../state/internals/CachedStateStore.java | 4 +-
.../state/internals/CachingKeyValueStore.java | 9 +++-
.../state/internals/CachingSessionStore.java | 7 ++-
.../state/internals/CachingWindowStore.java | 9 +++-
.../internals/CachingKeyValueStoreTest.java | 17 +++++--
.../internals/CachingSessionStoreTest.java | 51 +++++++++++++++++++-
.../state/internals/CachingWindowStoreTest.java | 14 +++++-
8 files changed, 100 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index f07d7bb..4c02d1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -42,7 +42,7 @@ class TupleForwarder<K, V> {
this.context = context;
this.sendOldValues = sendOldValues;
if (this.cachedStateStore != null) {
- cachedStateStore.setFlushListener(flushListener);
+ cachedStateStore.setFlushListener(flushListener, sendOldValues);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
index 2f0fa1c..4bc813c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
@@ -23,6 +23,8 @@ public interface CachedStateStore<K, V> {
* Set the {@link CacheFlushListener} to be notified when entries are flushed from the
* cache to the underlying {@link org.apache.kafka.streams.processor.StateStore}
* @param listener
+ * @param sendOldValues
*/
- void setFlushListener(final CacheFlushListener<K, V> listener);
+ void setFlushListener(final CacheFlushListener<K, V> listener,
+ final boolean sendOldValues);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index a89c741..f0669a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -38,6 +38,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private CacheFlushListener<K, V> flushListener;
+ private boolean sendOldValues;
private String cacheName;
private ThreadCache cache;
private InternalProcessorContext context;
@@ -87,9 +88,10 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
context.setRecordContext(entry.recordContext());
if (flushListener != null) {
+ final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
flushListener.apply(serdes.keyFrom(entry.key().get()),
serdes.valueFrom(entry.newValue()),
- serdes.valueFrom(underlying.get(entry.key())));
+ oldValue);
}
underlying.put(entry.key(), entry.newValue());
@@ -98,8 +100,11 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
}
- public void setFlushListener(final CacheFlushListener<K, V> flushListener) {
+ public void setFlushListener(final CacheFlushListener<K, V> flushListener,
+ final boolean sendOldValues) {
+
this.flushListener = flushListener;
+ this.sendOldValues = sendOldValues;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index a6a82cb..05851e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -47,6 +47,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
private StateSerdes<K, AGG> serdes;
private InternalProcessorContext context;
private CacheFlushListener<Windowed<K>, AGG> flushListener;
+ private boolean sendOldValues;
private String topic;
CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
@@ -170,7 +171,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
- final AGG oldValue = fetchPrevious(rawKey, key.window());
+ final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null;
if (!(newValue == null && oldValue == null)) {
flushListener.apply(key, newValue, oldValue);
}
@@ -202,8 +203,10 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
bytesStore.close();
}
- public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener) {
+ public void setFlushListener(final CacheFlushListener<Windowed<K>, AGG> flushListener,
+ final boolean sendOldValues) {
this.flushListener = flushListener;
+ this.sendOldValues = sendOldValues;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 799ecda..19e6e09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -49,6 +49,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
private StateSerdes<K, V> serdes;
private StateSerdes<Bytes, byte[]> bytesSerdes;
private CacheFlushListener<Windowed<K>, V> flushListener;
+ private boolean sendOldValues;
private final SegmentedCacheFunction cacheFunction;
CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
@@ -110,16 +111,20 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
final RecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
+ final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
flushListener.apply(windowedKey,
- serdes.valueFrom(entry.newValue()), fetchPrevious(key, windowedKey.window().start()));
+ serdes.valueFrom(entry.newValue()), oldValue);
} finally {
context.setRecordContext(current);
}
}
}
- public void setFlushListener(CacheFlushListener<Windowed<K>, V> flushListener) {
+ public void setFlushListener(final CacheFlushListener<Windowed<K>, V> flushListener,
+ final boolean sendOldValues) {
+
this.flushListener = flushListener;
+ this.sendOldValues = sendOldValues;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 73cdb25..97a2fbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -50,8 +50,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@@ -69,7 +69,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
cacheFlushListener = new CacheFlushListenerStub<>();
store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
- store.setFlushListener(cacheFlushListener);
+ store.setFlushListener(cacheFlushListener, false);
cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
topic = "topic";
@@ -103,7 +103,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
- inner.setFlushListener(cacheFlushListener);
+ inner.setFlushListener(cacheFlushListener, false);
store.init(context, store);
return store;
}
@@ -152,6 +152,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@Test
public void shouldForwardOldValuesWhenEnabled() {
+ store.setFlushListener(cacheFlushListener, true);
store.put(bytesKey("1"), bytesValue("a"));
store.flush();
store.put(bytesKey("1"), bytesValue("b"));
@@ -161,6 +162,16 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
}
@Test
+ public void shouldNotForwardOldValuesWhenDisabled() {
+ store.put(bytesKey("1"), bytesValue("a"));
+ store.flush();
+ store.put(bytesKey("1"), bytesValue("b"));
+ store.flush();
+ assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
+ assertNull(cacheFlushListener.forwarded.get("1").oldValue);
+ }
+
+ @Test
public void shouldIterateAllStoredItems() throws IOException {
int items = addItemsToCache();
final KeyValueIterator<Bytes, byte[]> all = store.all();
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index db19294..bda385e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -237,7 +237,7 @@ public class CachingSessionStoreTest {
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
- });
+ }, true);
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
@@ -254,6 +254,55 @@ public class CachingSessionStoreTest {
}
@Test
+ public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() {
+ final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
+ final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
+ final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
+ cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
+ flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
+ }
+ }, false);
+
+ cachingStore.put(a, "1".getBytes());
+ cachingStore.flush();
+
+ cachingStore.put(a, "2".getBytes());
+ cachingStore.flush();
+
+ cachingStore.remove(a);
+ cachingStore.flush();
+
+ assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
+ KeyValue.pair(aDeserialized, new Change<>("2", null)),
+ KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
+ }
+
+ @Test
+ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
+ final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
+ final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
+ final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
+ cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
+ flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
+ }
+ }, false);
+
+ cachingStore.put(a, "1".getBytes());
+ cachingStore.flush();
+
+ cachingStore.put(a, "2".getBytes());
+ cachingStore.flush();
+
+
+ assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
+ KeyValue.pair(aDeserialized, new Change<>("2", null))));
+ }
+
+ @Test
public void shouldClearNamespaceCacheOnClose() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
cachingStore.put(a1, "1".getBytes());
http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index ea2c47e..f1a0038 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -77,7 +77,7 @@ public class CachingWindowStoreTest {
Serdes.String(),
WINDOW_SIZE,
Segments.segmentInterval(retention, numSegments));
- cachingStore.setFlushListener(cacheListener);
+ cachingStore.setFlushListener(cacheListener, false);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
topic = "topic";
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
@@ -154,6 +154,7 @@ public class CachingWindowStoreTest {
@Test
public void shouldForwardOldValuesWhenEnabled() {
+ cachingStore.setFlushListener(cacheListener, true);
final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.flush();
@@ -164,6 +165,17 @@ public class CachingWindowStoreTest {
}
@Test
+ public void shouldForwardOldValuesWhenDisabled() {
+ final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+ cachingStore.put(bytesKey("1"), bytesValue("a"));
+ cachingStore.flush();
+ cachingStore.put(bytesKey("1"), bytesValue("b"));
+ cachingStore.flush();
+ assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
+ assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+ }
+
+ @Test
public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException {
int numRecords = addItemsToCache();
assertEquals(numRecords, cacheListener.forwarded.size());