You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/24 05:10:53 UTC

[1/4] kafka git commit: KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect ProcessorNode

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 f91d95ac9 -> ecb51680a


KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect ProcessorNode

Given a topology like the one below. If a record arriving in `tableOne` causes a cache eviction, it will trigger the `leftJoin` that will do a `get` from `reducer-store`. If the key is not currently cached in `reducer-store`, but is in the backing store, it will be put into the cache, and it may also trigger an eviction. If it does trigger an eviction and the eldest entry is dirty it will flush the dirty keys. It is at this point that a ClassCastException is thrown. This occurs because the ProcessorContext is still set to the context of the `leftJoin` and the next child in the topology is `mapValues`.
We need to set the correct `ProcessorNode`, on the context, in the `ForwardingCacheFlushListener` prior to calling `context.forward`. We also need to  remember to reset the `ProcessorNode` to the previous node once `context.forward` has completed.

```
       final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
        final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
            Override
            public KeyValue<String, Long> apply(final Long key, final String value) {
                return new KeyValue<>(value, key);
            }
        }, Serdes.String(), Serdes.Long())
                .reduce(new Reducer<Long>() {..}, new Reducer<Long>() {..}, "reducer-store");

    one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {..})
        .mapValues(new ValueMapper<String, String>() {..});

```

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2051 from dguy/kafka-4311


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1fb615a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1fb615a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1fb615a

Branch: refs/heads/0.10.1
Commit: c1fb615a6ef6cbd0a725bd70a2a602ca31402f8a
Parents: f91d95a
Author: Damian Guy <da...@gmail.com>
Authored: Wed Nov 9 10:43:27 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:52:57 2016 -0800

----------------------------------------------------------------------
 .../internals/ForwardingCacheFlushListener.java | 22 ++++--
 .../internals/InternalProcessorContext.java     |  1 +
 .../internals/ProcessorContextImpl.java         | 13 ++--
 .../processor/internals/StandbyContextImpl.java |  5 ++
 .../streams/processor/internals/StreamTask.java | 28 +++-----
 .../streams/state/internals/NamedCache.java     |  8 ++-
 .../kstream/internals/KTableAggregateTest.java  | 72 ++++++++++++++++++++
 .../internals/ProcessorTopologyTest.java        |  1 +
 .../streams/state/KeyValueStoreTestDriver.java  |  7 ++
 .../streams/state/internals/NamedCacheTest.java |  6 ++
 .../apache/kafka/test/KStreamTestDriver.java    | 23 ++++++-
 .../apache/kafka/test/MockProcessorContext.java |  7 ++
 12 files changed, 160 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
index 1796be9..4635fc9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -17,22 +17,32 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
-    private final ProcessorContext context;
+    private final InternalProcessorContext context;
     private final boolean sendOldValues;
+    private final ProcessorNode myNode;
 
     ForwardingCacheFlushListener(final ProcessorContext context, final boolean sendOldValues) {
-        this.context = context;
+        this.context = (InternalProcessorContext) context;
+        myNode = this.context.currentNode();
         this.sendOldValues = sendOldValues;
     }
 
     @Override
     public void apply(final K key, final V newValue, final V oldValue) {
-        if (sendOldValues) {
-            context.forward(key, new Change<>(newValue, oldValue));
-        } else {
-            context.forward(key, new Change<>(newValue, null));
+        final ProcessorNode prev = context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            if (sendOldValues) {
+                context.forward(key, new Change<>(newValue, oldValue));
+            } else {
+                context.forward(key, new Change<>(newValue, null));
+            }
+        } finally {
+            context.setCurrentNode(prev);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 251ff3f..016964b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -42,6 +42,7 @@ public interface InternalProcessorContext extends ProcessorContext {
      */
     void setCurrentNode(ProcessorNode currentNode);
 
+    ProcessorNode currentNode();
     /**
      * Get the thread-global cache
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 195e5a4..be18593 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -128,13 +128,11 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
      */
     @Override
     public StateStore getStateStore(String name) {
-        ProcessorNode node = task.node();
-
-        if (node == null)
+        if (currentNode == null)
             throw new TopologyBuilderException("Accessing from an unknown node");
 
-        if (!node.stateStores.contains(name)) {
-            throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
+        if (!currentNode.stateStores.contains(name)) {
+            throw new TopologyBuilderException("Processor " + currentNode.name() + " has no access to StateStore " + name);
         }
 
         return stateMgr.getStore(name);
@@ -272,4 +270,9 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
     public void setCurrentNode(final ProcessorNode currentNode) {
         this.currentNode = currentNode;
     }
+
+    @Override
+    public ProcessorNode currentNode() {
+        return currentNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 563dbce..80c0026 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -222,4 +222,9 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
     public void setCurrentNode(final ProcessorNode currentNode) {
         // no-op. can't throw as this is called on commit when the StateStores get flushed.
     }
+
+    @Override
+    public ProcessorNode currentNode() {
+        throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b993054..9a2f03e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -59,7 +59,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
-    private ProcessorNode currNode = null;
 
     private boolean requiresPoll = true;
 
@@ -122,11 +121,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // initialize the task by initializing all its processor nodes in the topology
         log.info("{} Initializing processor nodes of the topology", logPrefix);
         for (ProcessorNode node : this.topology.processors()) {
-            this.currNode = node;
+            processorContext.setCurrentNode(node);
             try {
                 node.init(this.processorContext);
             } finally {
-                this.currNode = null;
+                processorContext.setCurrentNode(null);
             }
         }
 
@@ -172,13 +171,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
         try {
             // process the record by passing to the source node of the topology
-            this.currNode = recordInfo.node();
+            final ProcessorNode currNode = recordInfo.node();
             TopicPartition partition = recordInfo.partition();
 
             log.trace("{} Start processing one record [{}]", logPrefix, record);
             final ProcessorRecordContext recordContext = createRecordContext(record);
             updateProcessorContext(recordContext, currNode);
-            this.currNode.process(record.key(), record.value());
+            currNode.process(record.key(), record.value());
 
             log.trace("{} Completed processing one record [{}]", logPrefix, record);
 
@@ -199,14 +198,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
         } catch (KafkaException ke) {
             throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d",
                                               id.toString(),
-                                              currNode.name(),
+                                              processorContext.currentNode().name(),
                                               record.topic(),
                                               record.partition(),
                                               record.offset()
                                               ), ke);
         } finally {
             processorContext.setCurrentNode(null);
-            this.currNode = null;
         }
 
         return partitionGroup.numBuffered();
@@ -241,10 +239,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     @Override
     public void punctuate(ProcessorNode node, long timestamp) {
-        if (currNode != null)
+        if (processorContext.currentNode() != null)
             throw new IllegalStateException(String.format("%s Current node is not null", logPrefix));
 
-        currNode = node;
         final StampedRecord stampedRecord = new StampedRecord(DUMMY_RECORD, timestamp);
         updateProcessorContext(createRecordContext(stampedRecord), node);
 
@@ -256,15 +253,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
             throw new StreamsException(String.format("Exception caught in punctuate. taskId=%s processor=%s", id,  node.name()), ke);
         } finally {
             processorContext.setCurrentNode(null);
-            currNode = null;
         }
     }
 
 
-    public ProcessorNode node() {
-        return this.currNode;
-    }
-
     /**
      * Commit the current task state
      */
@@ -322,10 +314,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @throws IllegalStateException if the current node is not null
      */
     public void schedule(long interval) {
-        if (currNode == null)
+        if (processorContext.currentNode() == null)
             throw new IllegalStateException(String.format("%s Current node is null", logPrefix));
 
-        punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
+        punctuationQueue.schedule(new PunctuationSchedule(processorContext.currentNode(), interval));
     }
 
     /**
@@ -342,13 +334,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // make sure close() is called for each node even when there is a RuntimeException
         RuntimeException exception = null;
         for (ProcessorNode node : this.topology.processors()) {
-            currNode = node;
+            processorContext.setCurrentNode(node);
             try {
                 node.close();
             } catch (RuntimeException e) {
                 exception = e;
             } finally {
-                currNode = null;
+                processorContext.setCurrentNode(null);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 65a836e..ab771df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -109,7 +109,7 @@ class NamedCache {
         for (Bytes key : dirtyKeys) {
             final LRUNode node = getInternal(key);
             if (node == null) {
-                throw new IllegalStateException("Key found in dirty key set, but entry is null");
+                throw new IllegalStateException("Key = " + key + " found in dirty key set, but entry is null");
             }
             entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry));
             node.entry.markClean();
@@ -120,6 +120,12 @@ class NamedCache {
 
 
     synchronized void put(final Bytes key, final LRUCacheEntry value) {
+        if (!value.isDirty && dirtyKeys.contains(key)) {
+            throw new IllegalStateException(String.format("Attempting to put a clean entry for key [%s] " +
+                                                                  "into NamedCache [%s] when it already contains " +
+                                                                  "a dirty entry for the same key",
+                                                          key, name));
+        }
         LRUNode node = cache.get(key);
         if (node != null) {
             numOverwrites++;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index ba33d5c..8378a79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -22,10 +22,14 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -39,6 +43,8 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -320,4 +326,70 @@ public class KTableAggregateTest {
                  "1:2"
                  ), proc.processed);
     }
+
+    @Test
+    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() throws Exception {
+        final String tableOne = "tableOne";
+        final String tableTwo = "tableTwo";
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String reduceTopic = "TestDriver-reducer-store-repartition";
+        final Map<String, Long> reduceResults = new HashMap<>();
+
+        final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
+        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
+
+
+        final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
+            @Override
+            public KeyValue<String, Long> apply(final Long key, final String value) {
+                return new KeyValue<>(value, key);
+            }
+        }, Serdes.String(), Serdes.Long())
+                .reduce(new Reducer<Long>() {
+                    @Override
+                    public Long apply(final Long value1, final Long value2) {
+                        return value1 + value2;
+                    }
+                }, new Reducer<Long>() {
+                    @Override
+                    public Long apply(final Long value1, final Long value2) {
+                        return value1 - value2;
+                    }
+                }, "reducer-store");
+
+        reduce.foreach(new ForeachAction<String, Long>() {
+            @Override
+            public void apply(final String key, final Long value) {
+                reduceResults.put(key, value);
+            }
+        });
+
+        one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
+            @Override
+            public String apply(final String value1, final Long value2) {
+                return value1 + ":" + value2;
+            }
+        })
+                .mapValues(new ValueMapper<String, String>() {
+                    @Override
+                    public String apply(final String value) {
+                        return value;
+                    }
+                });
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, 111);
+        driver.process(reduceTopic, "1", new Change<>(1L, null));
+        driver.process("tableOne", "2", "2");
+        // this should trigger eviction on the reducer-store topic
+        driver.process(reduceTopic, "2", new Change<>(2L, null));
+        // this wont as it is the same value
+        driver.process(reduceTopic, "2", new Change<>(2L, null));
+        assertEquals(Long.valueOf(2L), reduceResults.get("2"));
+
+        // this will trigger eviction on the tableOne topic
+        // that in turn will cause an eviction on reducer-topic. It will flush
+        // key 2 as it is the only dirty entry in the cache
+        driver.process("tableOne", "1", "5");
+        assertEquals(Long.valueOf(4L), reduceResults.get("2"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 54ee43c..a146316 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -280,6 +280,7 @@ public class ProcessorTopologyTest {
                 .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
     }
 
+
     /**
      * A processor that simply forwards all messages to all children.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index e84e9ba..aca974b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorContext;
@@ -269,6 +270,12 @@ public class KeyValueStoreTestDriver<K, V> {
             public Map<String, Object> appConfigsWithPrefix(String prefix) {
                 return new StreamsConfig(props).originalsWithPrefix(prefix);
             }
+
+            @Override
+            public ProcessorNode currentNode() {
+                return null;
+            }
+
             @Override
             public ThreadCache getCache() {
                 return cache;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 3067256..5c0d511 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -191,4 +191,10 @@ public class NamedCacheTest {
     public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() throws Exception {
         cache.evict();
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() throws Exception {
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, false, 0, 0, 0, ""));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index ac58f37..05abbc6 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -56,14 +56,26 @@ public class KStreamTestDriver {
         this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
+    public KStreamTestDriver(KStreamBuilder builder, File stateDir, final long cacheSize) {
+        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
+    }
+
     public KStreamTestDriver(KStreamBuilder builder,
                              File stateDir,
                              Serde<?> keySerde,
                              Serde<?> valSerde) {
+        this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
+    }
+
+    public KStreamTestDriver(KStreamBuilder builder,
+                             File stateDir,
+                             Serde<?> keySerde,
+                             Serde<?> valSerde,
+                             long cacheSize) {
         builder.setApplicationId("TestDriver");
         this.topology = builder.build(null);
         this.stateDir = stateDir;
-        this.cache = new ThreadCache(DEFAULT_CACHE_SIZE_BYTES);
+        this.cache = new ThreadCache(cacheSize);
         this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
         this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
 
@@ -73,13 +85,14 @@ public class KStreamTestDriver {
         }
 
         for (ProcessorNode node : topology.processors()) {
-            currNode = node;
+            context.setCurrentNode(node);
             try {
                 node.init(context);
             } finally {
-                currNode = null;
+                context.setCurrentNode(null);
             }
         }
+
     }
 
     public ProcessorContext context() {
@@ -225,6 +238,10 @@ public class KStreamTestDriver {
 
     }
 
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        currNode = currentNode;
+    }
+
 
     private class MockRecordCollector extends RecordCollector {
         public MockRecordCollector() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fb615a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 8ad2fa9..cafdd9e 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -51,6 +51,7 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol
 
     long timestamp = -1L;
     private RecordContext recordContext;
+    private ProcessorNode currentNode;
 
     public MockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector) {
         this(null, null, serdes.keySerde(), serdes.valueSerde(), collector, null);
@@ -248,7 +249,13 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol
 
     @Override
     public void setCurrentNode(final ProcessorNode currentNode) {
+        this.currentNode  = currentNode;
+        driver.setCurrentNode(currentNode);
+    }
 
+    @Override
+    public ProcessorNode currentNode() {
+        return currentNode;
     }
 
 }


[2/4] kafka git commit: MINOR: fix incorrect logging in StreamThread

Posted by gu...@apache.org.
MINOR: fix incorrect logging in StreamThread

Fix incorrect logging when unable to create an active task. The output was: Failed to create an active task %s:
It should have the taskId.

Author: Damian Guy <da...@gmail.com>

Reviewers: Ismael Juma, Eno Thereska

Closes #2109 from dguy/minor-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c000eb22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c000eb22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c000eb22

Branch: refs/heads/0.10.1
Commit: c000eb22ae82184f3ec2bfeb91609bef7a1e875d
Parents: c1fb615
Author: Damian Guy <da...@gmail.com>
Authored: Wed Nov 9 11:22:28 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 08:51:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c000eb22/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d7bb98c..8b7478d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -663,7 +663,7 @@ public class StreamThread extends Thread {
                 for (TopicPartition partition : partitions)
                     activeTasksByPartition.put(partition, task);
             } catch (StreamsException e) {
-                log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
+                log.error("{} Failed to create an active task {}: ", logPrefix, taskId, e);
                 throw e;
             }
         }


[4/4] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

Posted by gu...@apache.org.
KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

  - bug-fix follow up
  - Resetter fails if no intermediate topic is used because seekToEnd() commit ALL partitions to EOL

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll, Roger Hoover, Guozhang Wang

Closes #2138 from mjsax/kafka-4331-streams-resetter-bugfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ecb51680
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ecb51680
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ecb51680

Branch: refs/heads/0.10.1
Commit: ecb51680a982120691becc37c302aa78135dcfdf
Parents: 9d3003b
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 23 17:31:34 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 20:58:18 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java |   8 +-
 .../integration/ResetIntegrationTest.java       | 195 +++++++++++++++----
 2 files changed, 164 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 1bb63f7..828eeef 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -218,8 +218,12 @@ public class StreamsResetter {
                 }
             }
 
-            client.seekToBeginning(inputAndInternalTopicPartitions);
-            client.seekToEnd(intermediateTopicPartitions);
+            if (inputAndInternalTopicPartitions.size() > 0) {
+                client.seekToBeginning(inputAndInternalTopicPartitions);
+            }
+            if (intermediateTopicPartitions.size() > 0) {
+                client.seekToEnd(intermediateTopicPartitions);
+            }
 
             for (final TopicPartition p : partitions) {
                 client.position(p);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d07970f..5f85536 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -21,6 +21,7 @@ import kafka.tools.StreamsResetter;
 import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -40,7 +41,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -75,10 +76,11 @@ public class ResetIntegrationTest {
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+    private static final int TIMEOUT_MULTIPLYER = 5;
 
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
-
-    private AdminClient adminClient = null;
+    private static int testNo = 0;
+    private static AdminClient adminClient = null;
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
@@ -86,24 +88,48 @@ public class ResetIntegrationTest {
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_2);
         CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
-        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
-    }
-
-    @Before
-    public void prepare() {
-        adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
     }
 
-    @After
-    public void cleanup() {
+    @AfterClass
+    public static void globalCleanup() {
         if (adminClient != null) {
             adminClient.close();
             adminClient = null;
         }
     }
 
+    @Before
+    public void cleanup() throws Exception {
+        ++testNo;
+
+        if (adminClient == null) {
+            adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
+        }
+
+        // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
+        while (true) {
+            Thread.sleep(50);
+
+            try {
+                TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                        "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+            } catch (GroupCoordinatorNotAvailableException e) {
+                continue;
+            } catch (IllegalArgumentException e) {
+                continue;
+            }
+            break;
+        }
+
+        if (testNo == 1) {
+            prepareInputData();
+        }
+    }
+
     @Test
-    public void testReprocessingFromScratchAfterReset() throws Exception {
+    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
+        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
+
         final Properties streamsConfiguration = prepareTest();
         final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
             CLUSTER.bootstrapServers(),
@@ -111,11 +137,8 @@ public class ResetIntegrationTest {
             LongDeserializer.class,
             LongDeserializer.class);
 
-        prepareInputData();
-        final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);
-
         // RUN
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
         streams.start();
         final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
@@ -131,17 +154,17 @@ public class ResetIntegrationTest {
         ).get(0);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
-        streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
-        cleanGlobal();
-        TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(INTERMEDIATE_USER_TOPIC);
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
-        assertInternalTopicsGotDeleted();
+        assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
 
         // RE-RUN
         streams.start();
@@ -159,11 +182,82 @@ public class ResetIntegrationTest {
 
         assertThat(resultRerun, equalTo(result));
         assertThat(resultRerun2, equalTo(result2));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(INTERMEDIATE_USER_TOPIC);
+
+        CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC);
+        Set<String> allTopics;
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+                    30000,
+                    30000,
+                    JaasUtils.isZkSecurityEnabled());
+
+            do {
+                Utils.sleep(100);
+                allTopics = new HashSet<>();
+                allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            } while (allTopics.contains(INTERMEDIATE_USER_TOPIC));
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+    @Test
+    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
+        final Properties streamsConfiguration = prepareTest();
+        final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+                LongDeserializer.class,
+                LongDeserializer.class);
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                resultTopicConsumerConfig,
+                OUTPUT_TOPIC,
+                10,
+                60000);
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
+                "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // RESET
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.cleanUp();
+        cleanGlobal(null);
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(null);
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                resultTopicConsumerConfig,
+                OUTPUT_TOPIC,
+                10,
+                60000);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(null);
     }
 
     private Properties prepareTest() throws Exception {
         final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -205,7 +299,7 @@ public class ResetIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
     }
 
-    private KStreamBuilder setupTopology(final String outputTopic2) {
+    private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@@ -251,27 +345,54 @@ public class ResetIntegrationTest {
         return builder;
     }
 
-    private void cleanGlobal() {
-        final Properties cleanUpConfig = new Properties();
-        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+    private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
-        final int exitCode = new StreamsResetter().run(
-            new String[]{
-                "--application-id", APP_ID,
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
+            @Override
+            public KeyValue<Long, Long> apply(final Long key, final String value) {
+                return new KeyValue<>(key, key);
+            }
+        }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
+
+        return builder;
+    }
+
+    private void cleanGlobal(final String intermediateUserTopic) {
+        final String[] parameters;
+        if (intermediateUserTopic != null) {
+            parameters = new String[]{
+                "--application-id", APP_ID + testNo,
                 "--bootstrap-server", CLUSTER.bootstrapServers(),
                 "--zookeeper", CLUSTER.zKConnectString(),
                 "--input-topics", INPUT_TOPIC,
                 "--intermediate-topics", INTERMEDIATE_USER_TOPIC
-            },
-            cleanUpConfig);
+            };
+        } else {
+            parameters = new String[]{
+                "--application-id", APP_ID + testNo,
+                "--bootstrap-server", CLUSTER.bootstrapServers(),
+                "--zookeeper", CLUSTER.zKConnectString(),
+                "--input-topics", INPUT_TOPIC
+            };
+        }
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
         Assert.assertEquals(0, exitCode);
     }
 
-    private void assertInternalTopicsGotDeleted() {
+    private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
         final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
-        expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
+        if (intermediateUserTopic != null) {
+            expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
+        }
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
@@ -301,7 +422,7 @@ public class ResetIntegrationTest {
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeGroup(APP_ID).members().isEmpty();
+            return adminClient.describeGroup(APP_ID + testNo).members().isEmpty();
         }
     }
 


[3/4] kafka git commit: KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger

Posted by gu...@apache.org.
KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger

The `StoreChangeLogger` currently keeps a cache of dirty and removed keys and will batch the changelog records such that we don't send a record for each update. However, with KIP-63 this is unnecessary as the batching and de-duping is done by the caching layer. Further, the `StoreChangeLogger` relies on `context.timestamp()` which is likely to be incorrect when caching is enabled

Author: Damian Guy <da...@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2103 from dguy/store-change-logger


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d3003b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d3003b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d3003b3

Branch: refs/heads/0.10.1
Commit: 9d3003b32dfc744c478277123c8878fa65dd9ec7
Parents: c000eb2
Author: Damian Guy <da...@gmail.com>
Authored: Fri Nov 11 10:21:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 08:55:48 2016 -0800

----------------------------------------------------------------------
 .../state/internals/CachingKeyValueStore.java   | 21 +++--
 .../internals/InMemoryKeyValueLoggedStore.java  | 23 ++---
 .../streams/state/internals/RocksDBStore.java   | 24 ++----
 .../state/internals/RocksDBWindowStore.java     | 26 ++----
 .../state/internals/StoreChangeLogger.java      | 89 +++-----------------
 .../internals/RocksDBKeyValueStoreTest.java     |  8 +-
 .../state/internals/StoreChangeLoggerTest.java  | 53 +++---------
 .../apache/kafka/test/KStreamTestDriver.java    |  5 +-
 8 files changed, 61 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 81ff5b5..ab050b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.ArrayList;
 import java.util.List;
 
 class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStore<K, V> {
@@ -78,27 +77,27 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
         cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
-                final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
                 for (ThreadCache.DirtyEntry entry : entries) {
-                    keyValues.add(KeyValue.pair(entry.key(), entry.newValue()));
-                    maybeForward(entry, (InternalProcessorContext) context);
+                    putAndMaybeForward(entry, (InternalProcessorContext) context);
                 }
-                underlying.putAll(keyValues);
             }
         });
 
     }
 
-    private void maybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
-        if (flushListener != null) {
-            final RecordContext current = context.recordContext();
+    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
+        final RecordContext current = context.recordContext();
+        try {
             context.setRecordContext(entry.recordContext());
-            try {
+            if (flushListener != null) {
+
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
                                     serdes.valueFrom(entry.newValue()), serdes.valueFrom(underlying.get(entry.key())));
-            } finally {
-                context.setRecordContext(current);
+
             }
+            underlying.put(entry.key(), entry.newValue());
+        } finally {
+            context.setRecordContext(current);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 4f056ec..d81f6fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -35,7 +35,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     private final String storeName;
 
     private StoreChangeLogger<K, V> changeLogger;
-    private StoreChangeLogger.ValueGetter<K, V> getter;
+    private ProcessorContext context;
 
     public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
         this.storeName = storeName;
@@ -52,6 +52,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
+        this.context = context;
         inner.init(context, root);
 
         // construct the serde
@@ -61,12 +62,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
-        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
-            @Override
-            public V get(K key) {
-                return inner.get(key);
-            }
-        };
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (inner instanceof MemoryLRUCache) {
@@ -98,16 +93,14 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     public void put(K key, V value) {
         this.inner.put(key, value);
 
-        changeLogger.add(key);
-        changeLogger.maybeLogChange(this.getter);
+        changeLogger.logChange(key, value);
     }
 
     @Override
     public V putIfAbsent(K key, V value) {
         V originalValue = this.inner.putIfAbsent(key, value);
         if (originalValue == null) {
-            changeLogger.add(key);
-            changeLogger.maybeLogChange(this.getter);
+            changeLogger.logChange(key, value);
         }
         return originalValue;
     }
@@ -118,9 +111,8 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
         for (KeyValue<K, V> entry : entries) {
             K key = entry.key;
-            changeLogger.add(key);
+            changeLogger.logChange(key, entry.value);
         }
-        changeLogger.maybeLogChange(this.getter);
     }
 
     @Override
@@ -139,8 +131,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
      * @param key the key for the entry that the inner store removed
      */
     protected void removed(K key) {
-        changeLogger.delete(key);
-        changeLogger.maybeLogChange(this.getter);
+        changeLogger.logChange(key, null);
     }
 
     @Override
@@ -166,7 +157,5 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     @Override
     public void flush() {
         this.inner.flush();
-
-        changeLogger.logChange(getter);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e27ffd8..41d633e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -98,9 +98,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private boolean loggingEnabled = false;
 
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
-    private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
     protected volatile boolean open = false;
+    private ProcessorContext context;
 
     public KeyValueStore<K, V> enableLogging() {
         loggingEnabled = true;
@@ -142,6 +142,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @SuppressWarnings("unchecked")
     public void openDB(ProcessorContext context) {
+        this.context = context;
         final Map<String, Object> configs = context.appConfigs();
         final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
         if (configSetterClass != null) {
@@ -165,13 +166,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
-            @Override
-            public byte[] get(Bytes key) {
-                return getInternal(key.get());
-            }
-        };
-
         context.register(root, loggingEnabled, new StateRestoreCallback() {
 
             @Override
@@ -247,8 +241,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         putInternal(rawKey, rawValue);
 
         if (loggingEnabled) {
-            changeLogger.add(Bytes.wrap(rawKey));
-            changeLogger.maybeLogChange(this.getter);
+            changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
         }
     }
 
@@ -292,16 +285,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 if (entry.value == null) {
                     db.remove(rawKey);
                 } else {
-                    batch.put(rawKey, serdes.rawValue(entry.value));
+                    final byte[] value = serdes.rawValue(entry.value);
+                    batch.put(rawKey, value);
                     if (loggingEnabled) {
-                        changeLogger.add(Bytes.wrap(rawKey));
+                        changeLogger.logChange(Bytes.wrap(rawKey), value);
                     }
                 }
             }
             db.write(wOptions, batch);
-            if (loggingEnabled) {
-                changeLogger.maybeLogChange(getter);
-            }
         } catch (RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
         }
@@ -371,9 +362,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         if (db == null) {
             return;
         }
-        if (loggingEnabled) {
-            changeLogger.logChange(getter);
-        }
         // flush RocksDB
         flushInternal();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index dd24320..b563137 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -142,7 +142,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final SimpleDateFormat formatter;
-    private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
     private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
 
     private ProcessorContext context;
@@ -166,11 +165,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         this.retainDuplicates = retainDuplicates;
 
-        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
-            public byte[] get(Bytes key) {
-                return getInternal(key.get());
-            }
-        };
 
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
@@ -262,9 +256,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 segment.flush();
             }
         }
-
-        if (loggingEnabled)
-            changeLogger.logChange(this.getter);
     }
 
     @Override
@@ -279,25 +270,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        byte[] rawKey = putAndReturnInternalKey(key, value, context.timestamp());
-
-        if (rawKey != null && loggingEnabled) {
-            changeLogger.add(Bytes.wrap(rawKey));
-            changeLogger.maybeLogChange(this.getter);
-        }
+        put(key, value, context.timestamp());
     }
 
     @Override
     public void put(K key, V value, long timestamp) {
-        byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
+        final byte[] rawValue = serdes.rawValue(value);
+        byte[] rawKey = putAndReturnInternalKey(key, rawValue, timestamp);
 
         if (rawKey != null && loggingEnabled) {
-            changeLogger.add(Bytes.wrap(rawKey));
-            changeLogger.maybeLogChange(this.getter);
+            changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
         }
     }
 
-    private byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
+    private byte[] putAndReturnInternalKey(K key, byte[] value, long timestamp) {
         long segmentId = segmentId(timestamp);
 
         if (segmentId > currentSegmentId) {
@@ -312,7 +298,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
-            segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value));
+            segment.put(Bytes.wrap(binaryKey), value);
             return binaryKey;
         } else {
             return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 41f9ae2..7aaddf8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -24,12 +24,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
- * Store change log collector that batches updates before sending to Kafka.
- *
  * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
  * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
@@ -37,97 +32,33 @@ import java.util.Set;
  * @param <K>
  * @param <V>
  */
-public class StoreChangeLogger<K, V> {
-
-    public interface ValueGetter<K, V> {
-        V get(K key);
-    }
-
-    // TODO: these values should be configurable
-    protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
+class StoreChangeLogger<K, V> {
 
     protected final StateSerdes<K, V> serialization;
 
     private final String topic;
     private final int partition;
     private final ProcessorContext context;
-    private final int maxDirty;
-    private final int maxRemoved;
-
-    protected Set<K> dirty;
-    protected Set<K> removed;
+    private final RecordCollector collector;
 
-    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
-        this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
-    }
 
-    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
-        this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
-        init();
+    StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
+        this(storeName, context, context.taskId().partition, serialization);
     }
 
-    protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
+    private StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization) {
         this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
         this.context = context;
         this.partition = partition;
         this.serialization = serialization;
-        this.maxDirty = maxDirty;
-        this.maxRemoved = maxRemoved;
-    }
-
-    public void init() {
-        this.dirty = new HashSet<>();
-        this.removed = new HashSet<>();
+        this.collector = ((RecordCollector.Supplier) context).recordCollector();
     }
 
-    public void add(K key) {
-        this.dirty.add(key);
-        this.removed.remove(key);
-    }
-
-    public void delete(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-    }
-
-    public void maybeLogChange(ValueGetter<K, V> getter) {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange(getter);
-    }
-
-    public void logChange(ValueGetter<K, V> getter) {
-        if (this.removed.isEmpty() && this.dirty.isEmpty())
-            return;
-
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+    void logChange(final K key, final V value) {
         if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = getter.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
+            final Serializer<K> keySerializer = serialization.keySerializer();
+            final Serializer<V> valueSerializer = serialization.valueSerializer();
+            collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), key, value), keySerializer, valueSerializer);
         }
     }
-
-    public void clear() {
-        this.removed.clear();
-        this.dirty.clear();
-    }
-
-    // this is for test only
-    public int numDirty() {
-        return this.dirty.size();
-    }
-
-    // this is for test only
-    public int numRemoved() {
-        return this.removed.size();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 25e0620..c3190b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -97,7 +97,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void shouldPerformRangeQueriesWithCachingDisabled() throws Exception {
         final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false);
+        final MockProcessorContext context = (MockProcessorContext) driver.context();
+        final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false);
+        context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
         final KeyValueIterator<Integer, String> range = store.range(1, 2);
@@ -109,7 +111,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void shouldPerformAllQueriesWithCachingDisabled() throws Exception {
         final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false);
+        final MockProcessorContext context = (MockProcessorContext) driver.context();
+        final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false);
+        context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
         final KeyValueIterator<Integer, String> range = store.all();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7675f9b..fbfffb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -32,13 +32,13 @@ import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class StoreChangeLoggerTest {
 
     private final String topic = "topic";
 
     private final Map<Integer, String> logged = new HashMap<>();
-    private final Map<Integer, String> written = new HashMap<>();
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollector(null, "StoreChangeLoggerTest") {
@@ -57,49 +57,22 @@ public class StoreChangeLoggerTest {
             }
     );
 
-    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
+    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
 
-    private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
-        @Override
-        public String get(Integer key) {
-            return written.get(key);
-        }
-    };
 
     @Test
-    public void testAddRemove() {
+    public void testAddRemove() throws Exception {
         context.setTime(1);
-        written.put(0, "zero");
-        changeLogger.add(0);
-        written.put(1, "one");
-        changeLogger.add(1);
-        written.put(2, "two");
-        changeLogger.add(2);
-        assertEquals(3, changeLogger.numDirty());
-        assertEquals(0, changeLogger.numRemoved());
-
-        changeLogger.delete(0);
-        changeLogger.delete(1);
-        written.put(3, "three");
-        changeLogger.add(3);
-        assertEquals(2, changeLogger.numDirty());
-        assertEquals(2, changeLogger.numRemoved());
-
-        written.put(0, "zero-again");
-        changeLogger.add(0);
-        assertEquals(3, changeLogger.numDirty());
-        assertEquals(1, changeLogger.numRemoved());
-
-        written.put(4, "four");
-        changeLogger.add(4);
-        changeLogger.maybeLogChange(getter);
-        assertEquals(0, changeLogger.numDirty());
-        assertEquals(0, changeLogger.numRemoved());
-        assertEquals(5, logged.size());
-        assertEquals("zero-again", logged.get(0));
-        assertEquals(null, logged.get(1));
+        changeLogger.logChange(0, "zero");
+        changeLogger.logChange(1, "one");
+        changeLogger.logChange(2, "two");
+
+        assertEquals("zero", logged.get(0));
+        assertEquals("one", logged.get(1));
         assertEquals("two", logged.get(2));
-        assertEquals("three", logged.get(3));
-        assertEquals("four", logged.get(4));
+
+        changeLogger.logChange(0, null);
+        assertNull(logged.get(0));
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 05abbc6..14e15a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -100,12 +100,15 @@ public class KStreamTestDriver {
     }
 
     public void process(String topicName, Object key, Object value) {
+        final ProcessorNode previous = currNode;
         currNode = topology.source(topicName);
 
         // if currNode is null, check if this topic is a changelog topic;
         // if yes, skip
-        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))
+        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
+            currNode = previous;
             return;
+        }
         context.setRecordContext(createRecordContext(context.timestamp()));
         context.setCurrentNode(currNode);
         try {