You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/29 10:13:19 UTC

kafka git commit: KAFKA-5932; Avoid call to fetchPrevious in FlushListeners

Repository: kafka
Updated Branches:
  refs/heads/trunk 082def05c -> 36556b804


KAFKA-5932; Avoid call to fetchPrevious in FlushListeners

Author: Bill Bejeck <bi...@confluent.io>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #3978 from bbejeck/KAFKA-5932_no_fetch_previous_when_no_old_values_returned


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36556b80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36556b80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36556b80

Branch: refs/heads/trunk
Commit: 36556b8041d3647375380e6fd70b8f37ba572ddc
Parents: 082def0
Author: Bill Bejeck <bi...@confluent.io>
Authored: Fri Sep 29 11:11:12 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Sep 29 11:11:12 2017 +0100

----------------------------------------------------------------------
 .../kstream/internals/TupleForwarder.java       |  2 +-
 .../state/internals/CachedStateStore.java       |  4 +-
 .../state/internals/CachingKeyValueStore.java   |  9 +++-
 .../state/internals/CachingSessionStore.java    |  7 ++-
 .../state/internals/CachingWindowStore.java     |  9 +++-
 .../internals/CachingKeyValueStoreTest.java     | 17 +++++--
 .../internals/CachingSessionStoreTest.java      | 51 +++++++++++++++++++-
 .../state/internals/CachingWindowStoreTest.java | 14 +++++-
 8 files changed, 100 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index f07d7bb..4c02d1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -42,7 +42,7 @@ class TupleForwarder<K, V> {
         this.context = context;
         this.sendOldValues = sendOldValues;
         if (this.cachedStateStore != null) {
-            cachedStateStore.setFlushListener(flushListener);
+            cachedStateStore.setFlushListener(flushListener, sendOldValues);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
index 2f0fa1c..4bc813c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
@@ -23,6 +23,8 @@ public interface CachedStateStore<K, V> {
      * Set the {@link CacheFlushListener} to be notified when entries are flushed from the
      * cache to the underlying {@link org.apache.kafka.streams.processor.StateStore}
      * @param listener
+     * @param sendOldValues
      */
-    void setFlushListener(final CacheFlushListener<K, V> listener);
+    void setFlushListener(final CacheFlushListener<K, V> listener,
+                          final boolean sendOldValues);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index a89c741..f0669a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -38,6 +38,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private CacheFlushListener<K, V> flushListener;
+    private boolean sendOldValues;
     private String cacheName;
     private ThreadCache cache;
     private InternalProcessorContext context;
@@ -87,9 +88,10 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
             context.setRecordContext(entry.recordContext());
             if (flushListener != null) {
 
+                final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
                                     serdes.valueFrom(entry.newValue()),
-                                    serdes.valueFrom(underlying.get(entry.key())));
+                                    oldValue);
 
             }
             underlying.put(entry.key(), entry.newValue());
@@ -98,8 +100,11 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         }
     }
 
-    public void setFlushListener(final CacheFlushListener<K, V> flushListener) {
+    public void setFlushListener(final CacheFlushListener<K, V> flushListener,
+                                 final boolean sendOldValues) {
+
         this.flushListener = flushListener;
+        this.sendOldValues = sendOldValues;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index a6a82cb..05851e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -47,6 +47,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     private StateSerdes<K, AGG> serdes;
     private InternalProcessorContext context;
     private CacheFlushListener<Windowed<K>, AGG> flushListener;
+    private boolean sendOldValues;
     private String topic;
 
     CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
@@ -170,7 +171,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
             final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
-                final AGG oldValue = fetchPrevious(rawKey, key.window());
+                final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null;
                 if (!(newValue == null && oldValue == null)) {
                     flushListener.apply(key, newValue, oldValue);
                 }
@@ -202,8 +203,10 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
         bytesStore.close();
     }
 
-    public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener) {
+    public void setFlushListener(final CacheFlushListener<Windowed<K>, AGG> flushListener,
+                                 final boolean sendOldValues) {
         this.flushListener = flushListener;
+        this.sendOldValues = sendOldValues;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 799ecda..19e6e09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -49,6 +49,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
     private StateSerdes<K, V> serdes;
     private StateSerdes<Bytes, byte[]> bytesSerdes;
     private CacheFlushListener<Windowed<K>, V> flushListener;
+    private boolean sendOldValues;
     private final SegmentedCacheFunction cacheFunction;
 
     CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
@@ -110,16 +111,20 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
             final RecordContext current = context.recordContext();
             context.setRecordContext(entry.recordContext());
             try {
+                final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
                 flushListener.apply(windowedKey,
-                                    serdes.valueFrom(entry.newValue()), fetchPrevious(key, windowedKey.window().start()));
+                                    serdes.valueFrom(entry.newValue()), oldValue);
             } finally {
                 context.setRecordContext(current);
             }
         }
     }
 
-    public void setFlushListener(CacheFlushListener<Windowed<K>, V> flushListener) {
+    public void setFlushListener(final CacheFlushListener<Windowed<K>, V> flushListener,
+                                 final boolean sendOldValues) {
+
         this.flushListener = flushListener;
+        this.sendOldValues = sendOldValues;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 73cdb25..97a2fbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -50,8 +50,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -69,7 +69,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
         cacheFlushListener = new CacheFlushListenerStub<>();
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
-        store.setFlushListener(cacheFlushListener);
+        store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
         topic = "topic";
@@ -103,7 +103,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
 
         final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
-        inner.setFlushListener(cacheFlushListener);
+        inner.setFlushListener(cacheFlushListener, false);
         store.init(context, store);
         return store;
     }
@@ -152,6 +152,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @Test
     public void shouldForwardOldValuesWhenEnabled() {
+        store.setFlushListener(cacheFlushListener, true);
         store.put(bytesKey("1"), bytesValue("a"));
         store.flush();
         store.put(bytesKey("1"), bytesValue("b"));
@@ -161,6 +162,16 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     }
 
     @Test
+    public void shouldNotForwardOldValuesWhenDisabled() {
+        store.put(bytesKey("1"), bytesValue("a"));
+        store.flush();
+        store.put(bytesKey("1"), bytesValue("b"));
+        store.flush();
+        assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
+        assertNull(cacheFlushListener.forwarded.get("1").oldValue);
+    }
+
+    @Test
     public void shouldIterateAllStoredItems() throws IOException {
         int items = addItemsToCache();
         final KeyValueIterator<Bytes, byte[]> all = store.all();

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index db19294..bda385e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -237,7 +237,7 @@ public class CachingSessionStoreTest {
                 public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
                     flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
                 }
-            });
+            }, true);
         
         cachingStore.put(a, "1".getBytes());
         cachingStore.flush();
@@ -254,6 +254,55 @@ public class CachingSessionStoreTest {
     }
 
     @Test
+    public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() {
+        final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
+        final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
+        final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
+        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
+            @Override
+            public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
+                flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
+            }
+        }, false);
+
+        cachingStore.put(a, "1".getBytes());
+        cachingStore.flush();
+
+        cachingStore.put(a, "2".getBytes());
+        cachingStore.flush();
+
+        cachingStore.remove(a);
+        cachingStore.flush();
+
+        assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
+                                            KeyValue.pair(aDeserialized, new Change<>("2", null)),
+                                            KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
+    }
+
+    @Test
+    public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
+        final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
+        final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
+        final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
+        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
+            @Override
+            public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
+                flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
+            }
+        }, false);
+
+        cachingStore.put(a, "1".getBytes());
+        cachingStore.flush();
+
+        cachingStore.put(a, "2".getBytes());
+        cachingStore.flush();
+
+
+        assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
+                                            KeyValue.pair(aDeserialized, new Change<>("2", null))));
+    }
+
+    @Test
     public void shouldClearNamespaceCacheOnClose() {
         final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
         cachingStore.put(a1, "1".getBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/36556b80/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index ea2c47e..f1a0038 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -77,7 +77,7 @@ public class CachingWindowStoreTest {
                                                 Serdes.String(),
                                                 WINDOW_SIZE,
                                                 Segments.segmentInterval(retention, numSegments));
-        cachingStore.setFlushListener(cacheListener);
+        cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
         context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
@@ -154,6 +154,7 @@ public class CachingWindowStoreTest {
 
     @Test
     public void shouldForwardOldValuesWhenEnabled() {
+        cachingStore.setFlushListener(cacheListener, true);
         final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
         cachingStore.put(bytesKey("1"), bytesValue("a"));
         cachingStore.flush();
@@ -164,6 +165,17 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    public void shouldForwardOldValuesWhenDisabled() {
+        final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        cachingStore.put(bytesKey("1"), bytesValue("a"));
+        cachingStore.flush();
+        cachingStore.put(bytesKey("1"), bytesValue("b"));
+        cachingStore.flush();
+        assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+    }
+
+    @Test
     public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException {
         int numRecords = addItemsToCache();
         assertEquals(numRecords, cacheListener.forwarded.size());