You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/11 18:21:06 UTC

kafka git commit: KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger

Repository: kafka
Updated Branches:
  refs/heads/trunk 978393446 -> 64a860c58


KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger

The `StoreChangeLogger` currently keeps a cache of dirty and removed keys and will batch the changelog records such that we don't send a record for each update. However, with KIP-63 this is unnecessary as the batching and de-duping is done by the caching layer. Further, the `StoreChangeLogger` relies on `context.timestamp()` which is likely to be incorrect when caching is enabled

Author: Damian Guy <da...@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2103 from dguy/store-change-logger


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64a860c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64a860c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64a860c5

Branch: refs/heads/trunk
Commit: 64a860c5853e21ff91109d0acb74b42f2dc20c7c
Parents: 9783934
Author: Damian Guy <da...@gmail.com>
Authored: Fri Nov 11 10:21:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Nov 11 10:21:03 2016 -0800

----------------------------------------------------------------------
 .../state/internals/CachingKeyValueStore.java   | 21 +++---
 .../internals/InMemoryKeyValueLoggedStore.java  | 23 ++----
 .../streams/state/internals/RocksDBStore.java   | 24 ++----
 .../state/internals/RocksDBWindowStore.java     | 26 ++-----
 .../state/internals/StoreChangeLogger.java      | 78 +++-----------------
 .../internals/RocksDBKeyValueStoreTest.java     |  8 +-
 .../state/internals/StoreChangeLoggerTest.java  | 52 +++----------
 .../apache/kafka/test/KStreamTestDriver.java    |  5 +-
 8 files changed, 60 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 81ff5b5..ab050b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.ArrayList;
 import java.util.List;
 
 class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStore<K, V> {
@@ -78,27 +77,27 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
         cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
-                final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
                 for (ThreadCache.DirtyEntry entry : entries) {
-                    keyValues.add(KeyValue.pair(entry.key(), entry.newValue()));
-                    maybeForward(entry, (InternalProcessorContext) context);
+                    putAndMaybeForward(entry, (InternalProcessorContext) context);
                 }
-                underlying.putAll(keyValues);
             }
         });
 
     }
 
-    private void maybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
-        if (flushListener != null) {
-            final RecordContext current = context.recordContext();
+    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
+        final RecordContext current = context.recordContext();
+        try {
             context.setRecordContext(entry.recordContext());
-            try {
+            if (flushListener != null) {
+
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
                                     serdes.valueFrom(entry.newValue()), serdes.valueFrom(underlying.get(entry.key())));
-            } finally {
-                context.setRecordContext(current);
+
             }
+            underlying.put(entry.key(), entry.newValue());
+        } finally {
+            context.setRecordContext(current);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 4f056ec..d81f6fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -35,7 +35,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     private final String storeName;
 
     private StoreChangeLogger<K, V> changeLogger;
-    private StoreChangeLogger.ValueGetter<K, V> getter;
+    private ProcessorContext context;
 
     public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
         this.storeName = storeName;
@@ -52,6 +52,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
+        this.context = context;
         inner.init(context, root);
 
         // construct the serde
@@ -61,12 +62,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
-        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
-            @Override
-            public V get(K key) {
-                return inner.get(key);
-            }
-        };
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (inner instanceof MemoryLRUCache) {
@@ -98,16 +93,14 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     public void put(K key, V value) {
         this.inner.put(key, value);
 
-        changeLogger.add(key);
-        changeLogger.maybeLogChange(this.getter);
+        changeLogger.logChange(key, value);
     }
 
     @Override
     public V putIfAbsent(K key, V value) {
         V originalValue = this.inner.putIfAbsent(key, value);
         if (originalValue == null) {
-            changeLogger.add(key);
-            changeLogger.maybeLogChange(this.getter);
+            changeLogger.logChange(key, value);
         }
         return originalValue;
     }
@@ -118,9 +111,8 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
         for (KeyValue<K, V> entry : entries) {
             K key = entry.key;
-            changeLogger.add(key);
+            changeLogger.logChange(key, entry.value);
         }
-        changeLogger.maybeLogChange(this.getter);
     }
 
     @Override
@@ -139,8 +131,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
      * @param key the key for the entry that the inner store removed
      */
     protected void removed(K key) {
-        changeLogger.delete(key);
-        changeLogger.maybeLogChange(this.getter);
+        changeLogger.logChange(key, null);
     }
 
     @Override
@@ -166,7 +157,5 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     @Override
     public void flush() {
         this.inner.flush();
-
-        changeLogger.logChange(getter);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b0f5ee4..9c6703a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -95,9 +95,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private boolean loggingEnabled = false;
 
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
-    private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
     protected volatile boolean open = false;
+    private ProcessorContext context;
 
     public KeyValueStore<K, V> enableLogging() {
         loggingEnabled = true;
@@ -139,6 +139,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @SuppressWarnings("unchecked")
     public void openDB(ProcessorContext context) {
+        this.context = context;
         final Map<String, Object> configs = context.appConfigs();
         final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
         if (configSetterClass != null) {
@@ -162,13 +163,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
-            @Override
-            public byte[] get(Bytes key) {
-                return getInternal(key.get());
-            }
-        };
-
         context.register(root, loggingEnabled, new StateRestoreCallback() {
 
             @Override
@@ -244,8 +238,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         putInternal(rawKey, rawValue);
 
         if (loggingEnabled) {
-            changeLogger.add(Bytes.wrap(rawKey));
-            changeLogger.maybeLogChange(this.getter);
+            changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
         }
     }
 
@@ -289,16 +282,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 if (entry.value == null) {
                     db.remove(rawKey);
                 } else {
-                    batch.put(rawKey, serdes.rawValue(entry.value));
+                    final byte[] value = serdes.rawValue(entry.value);
+                    batch.put(rawKey, value);
                     if (loggingEnabled) {
-                        changeLogger.add(Bytes.wrap(rawKey));
+                        changeLogger.logChange(Bytes.wrap(rawKey), value);
                     }
                 }
             }
             db.write(wOptions, batch);
-            if (loggingEnabled) {
-                changeLogger.maybeLogChange(getter);
-            }
         } catch (RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
         }
@@ -368,9 +359,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         if (db == null) {
             return;
         }
-        if (loggingEnabled) {
-            changeLogger.logChange(getter);
-        }
         // flush RocksDB
         flushInternal();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index dd24320..b563137 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -142,7 +142,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final SimpleDateFormat formatter;
-    private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
     private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
 
     private ProcessorContext context;
@@ -166,11 +165,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         this.retainDuplicates = retainDuplicates;
 
-        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
-            public byte[] get(Bytes key) {
-                return getInternal(key.get());
-            }
-        };
 
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
@@ -262,9 +256,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 segment.flush();
             }
         }
-
-        if (loggingEnabled)
-            changeLogger.logChange(this.getter);
     }
 
     @Override
@@ -279,25 +270,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        byte[] rawKey = putAndReturnInternalKey(key, value, context.timestamp());
-
-        if (rawKey != null && loggingEnabled) {
-            changeLogger.add(Bytes.wrap(rawKey));
-            changeLogger.maybeLogChange(this.getter);
-        }
+        put(key, value, context.timestamp());
     }
 
     @Override
     public void put(K key, V value, long timestamp) {
-        byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
+        final byte[] rawValue = serdes.rawValue(value);
+        byte[] rawKey = putAndReturnInternalKey(key, rawValue, timestamp);
 
         if (rawKey != null && loggingEnabled) {
-            changeLogger.add(Bytes.wrap(rawKey));
-            changeLogger.maybeLogChange(this.getter);
+            changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
         }
     }
 
-    private byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
+    private byte[] putAndReturnInternalKey(K key, byte[] value, long timestamp) {
         long segmentId = segmentId(timestamp);
 
         if (segmentId > currentSegmentId) {
@@ -312,7 +298,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
-            segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value));
+            segment.put(Bytes.wrap(binaryKey), value);
             return binaryKey;
         } else {
             return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 83e3d10..44cda5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -24,12 +24,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
- * Store change log collector that batches updates before sending to Kafka.
- *
  * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
  * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
@@ -37,87 +32,34 @@ import java.util.Set;
  * @param <K>
  * @param <V>
  */
-public class StoreChangeLogger<K, V> {
-
-    public interface ValueGetter<K, V> {
-        V get(K key);
-    }
-
-    // TODO: these values should be configurable
-    protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
+class StoreChangeLogger<K, V> {
 
     protected final StateSerdes<K, V> serialization;
 
     private final String topic;
     private final int partition;
     private final ProcessorContext context;
-    private final int maxDirty;
-    private final int maxRemoved;
+    private final RecordCollector collector;
 
-    protected Set<K> dirty;
-    protected Set<K> removed;
 
-    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
-        this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+    StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
+        this(storeName, context, context.taskId().partition, serialization);
     }
 
-    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
-        this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
-        init();
-    }
-
-    protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
+    private StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization) {
         this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
         this.context = context;
         this.partition = partition;
         this.serialization = serialization;
-        this.maxDirty = maxDirty;
-        this.maxRemoved = maxRemoved;
-    }
-
-    public void init() {
-        this.dirty = new HashSet<>();
-        this.removed = new HashSet<>();
+        this.collector = ((RecordCollector.Supplier) context).recordCollector();
     }
 
-    public void add(K key) {
-        this.dirty.add(key);
-        this.removed.remove(key);
-    }
-
-    public void delete(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-    }
-
-    public void maybeLogChange(ValueGetter<K, V> getter) {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange(getter);
-    }
-
-    public void logChange(ValueGetter<K, V> getter) {
-        if (this.removed.isEmpty() && this.dirty.isEmpty())
-            return;
-
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+    void logChange(final K key, final V value) {
         if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = getter.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
+            final Serializer<K> keySerializer = serialization.keySerializer();
+            final Serializer<V> valueSerializer = serialization.valueSerializer();
+            collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), key, value), keySerializer, valueSerializer);
         }
     }
 
-    public void clear() {
-        this.removed.clear();
-        this.dirty.clear();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 7dcd63d..b49c01f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -97,7 +97,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void shouldPerformRangeQueriesWithCachingDisabled() throws Exception {
         final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false);
+        final MockProcessorContext context = (MockProcessorContext) driver.context();
+        final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false);
+        context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
         final KeyValueIterator<Integer, String> range = store.range(1, 2);
@@ -109,7 +111,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void shouldPerformAllQueriesWithCachingDisabled() throws Exception {
         final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false);
+        final MockProcessorContext context = (MockProcessorContext) driver.context();
+        final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false);
+        context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
         final KeyValueIterator<Integer, String> range = store.all();

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 4e8a497..ce4a360 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class StoreChangeLoggerTest {
 
@@ -57,52 +58,23 @@ public class StoreChangeLoggerTest {
             }
     );
 
-    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
+    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
 
-    private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
-        @Override
-        public String get(Integer key) {
-            return written.get(key);
-        }
-    };
 
     @SuppressWarnings("unchecked")
     @Test
     public void testAddRemove() throws Exception {
-
         context.setTime(1);
-        written.put(0, "zero");
-        changeLogger.add(0);
-        written.put(1, "one");
-        changeLogger.add(1);
-        written.put(2, "two");
-        changeLogger.add(2);
-
-        assertEquals(3, changeLogger.dirty.size());
-        assertEquals(0, changeLogger.removed.size());
-
-        changeLogger.delete(0);
-        changeLogger.delete(1);
-        written.put(3, "three");
-        changeLogger.add(3);
-        assertEquals(2, changeLogger.dirty.size());
-        assertEquals(2, changeLogger.removed.size());
-
-        written.put(0, "zero-again");
-        changeLogger.add(0);
-        assertEquals(3, changeLogger.dirty.size());
-        assertEquals(1, changeLogger.removed.size());
-
-        written.put(4, "four");
-        changeLogger.add(4);
-        changeLogger.maybeLogChange(getter);
-        assertEquals(0, changeLogger.dirty.size());
-        assertEquals(0, changeLogger.removed.size());
-        assertEquals(5, logged.size());
-        assertEquals("zero-again", logged.get(0));
-        assertEquals(null, logged.get(1));
+        changeLogger.logChange(0, "zero");
+        changeLogger.logChange(1, "one");
+        changeLogger.logChange(2, "two");
+
+        assertEquals("zero", logged.get(0));
+        assertEquals("one", logged.get(1));
         assertEquals("two", logged.get(2));
-        assertEquals("three", logged.get(3));
-        assertEquals("four", logged.get(4));
+
+        changeLogger.logChange(0, null);
+        assertNull(logged.get(0));
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64a860c5/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 05abbc6..14e15a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -100,12 +100,15 @@ public class KStreamTestDriver {
     }
 
     public void process(String topicName, Object key, Object value) {
+        final ProcessorNode previous = currNode;
         currNode = topology.source(topicName);
 
         // if currNode is null, check if this topic is a changelog topic;
         // if yes, skip
-        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))
+        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
+            currNode = previous;
             return;
+        }
         context.setRecordContext(createRecordContext(context.timestamp()));
         context.setCurrentNode(currNode);
         try {