You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/08 08:16:23 UTC

kafka git commit: HOTFIX: fix ProcessorStateManager to use correct ktable partitions

Repository: kafka
Updated Branches:
  refs/heads/trunk b390b15cd -> 268392f5e


HOTFIX: fix ProcessorStateManager to use correct ktable partitions

guozhangwang

* fix ProcessorStateManager to use correct ktable partitions
* more ktable tests

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #635 from ymatsuda/more_ktable_test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/268392f5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/268392f5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/268392f5

Branch: refs/heads/trunk
Commit: 268392f5e99dc3eab9d89e921bafde782a1d10a6
Parents: b390b15
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Dec 7 23:16:19 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Dec 7 23:16:19 2015 -0800

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |   2 +-
 .../internals/ProcessorStateManager.java        |  36 +++++-
 .../internals/ProcessorStateManagerTest.java    |  34 ++++--
 .../processor/internals/StandbyTaskTest.java    | 112 ++++++++++++++++++-
 .../kafka/test/MockStateStoreSupplier.java      |  20 +++-
 5 files changed, 184 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b3255bb..569f4e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -59,7 +59,7 @@ public abstract class AbstractTask {
         try {
             File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
             // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(jobId, id.partition, stateFile, restoreConsumer, isStandby);
+            this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
             throw new KafkaException("Error while creating the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 579d245..3cac3f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -35,10 +35,13 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ProcessorStateManager {
 
@@ -49,10 +52,12 @@ public class ProcessorStateManager {
     public static final String LOCK_FILE_NAME = ".lock";
 
     private final String jobId;
-    private final int partition;
+    private final int defaultPartition;
+    private final Map<String, TopicPartition> partitionForTopic;
     private final File baseDir;
     private final FileLock directoryLock;
     private final Map<String, StateStore> stores;
+    private final Set<String> loggingEnabled;
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final Map<TopicPartition, Long> restoredOffsets;
     private final Map<TopicPartition, Long> checkpointedOffsets;
@@ -60,11 +65,16 @@ public class ProcessorStateManager {
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
 
-    public ProcessorStateManager(String jobId, int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+    public ProcessorStateManager(String jobId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
         this.jobId = jobId;
-        this.partition = partition;
+        this.defaultPartition = defaultPartition;
+        this.partitionForTopic = new HashMap<>();
+        for (TopicPartition source : sources) {
+            this.partitionForTopic.put(source.topic(), source);
+        }
         this.baseDir = baseDir;
         this.stores = new HashMap<>();
+        this.loggingEnabled = new HashSet<>();
         this.restoreConsumer = restoreConsumer;
         this.restoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
@@ -119,6 +129,9 @@ public class ProcessorStateManager {
         if (this.stores.containsKey(store.name()))
             throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
 
+        if (loggingEnabled)
+            this.loggingEnabled.add(store.name());
+
         // check that the underlying change log topic exist or not
         String topic;
         if (loggingEnabled)
@@ -126,6 +139,7 @@ public class ProcessorStateManager {
         else topic = store.name();
 
         // block until the partition is ready for this state changelog topic or time has elapsed
+        int partition = getPartition(topic);
         boolean partitionNotFound = true;
         long startTime = System.currentTimeMillis();
         long waitTime = 5000L;      // hard-code the value since we should not block after KIP-4
@@ -146,7 +160,7 @@ public class ProcessorStateManager {
         } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
 
         if (partitionNotFound)
-            throw new KafkaException("Store " + store.name() + "'s change log does not contain partition " + partition);
+            throw new KafkaException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
 
         this.stores.put(store.name(), store);
 
@@ -165,7 +179,7 @@ public class ProcessorStateManager {
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
         }
-        TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), partition);
+        TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), getPartition(store.name()));
         restoreConsumer.assign(Collections.singletonList(storePartition));
 
         try {
@@ -212,6 +226,7 @@ public class ProcessorStateManager {
 
         for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
             String topicName = entry.getKey();
+            int partition = getPartition(topicName);
             TopicPartition storePartition = new TopicPartition(topicName, partition);
 
             if (checkpointedOffsets.containsKey(storePartition)) {
@@ -293,7 +308,11 @@ public class ProcessorStateManager {
 
             Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
             for (String storeName : stores.keySet()) {
-                TopicPartition part = new TopicPartition(storeChangelogTopic(jobId, storeName), partition);
+                TopicPartition part;
+                if (loggingEnabled.contains(storeName))
+                    part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
+                else
+                    part = new TopicPartition(storeName, getPartition(storeName));
 
                 // only checkpoint the offset to the offsets file if it is persistent;
                 if (stores.get(storeName).persistent()) {
@@ -323,4 +342,9 @@ public class ProcessorStateManager {
         directoryLock.release();
     }
 
+    private int getPartition(String topic) {
+        TopicPartition partition = partitionForTopic.get(topic);
+
+        return partition == null ? defaultPartition : partition.partition();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 7e5ce49..5e336cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -179,6 +180,7 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    private final Set<TopicPartition> noPartitions = Collections.emptySet();
     private final String jobId = "test-job";
     private final String stateDir = "test";
     private final String persistentStoreName = "persistentStore";
@@ -193,7 +195,7 @@ public class ProcessorStateManagerTest {
             FileLock lock;
 
             // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
 
             try {
                 // this should not get the lock
@@ -222,7 +224,7 @@ public class ProcessorStateManagerTest {
         try {
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
             try {
                 stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
             } finally {
@@ -243,7 +245,6 @@ public class ProcessorStateManagerTest {
             checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
 
             restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
                     new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
@@ -253,7 +254,9 @@ public class ProcessorStateManagerTest {
             TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
             restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); // persistent store
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -292,7 +295,6 @@ public class ProcessorStateManagerTest {
             long lastCheckpointedOffset = 10L;
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
 
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
@@ -307,6 +309,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
 
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -344,15 +347,16 @@ public class ProcessorStateManagerTest {
             long lastCheckpointedOffset = 10L;
             String storeName1 = "store1";
             String storeName2 = "store2";
+            String storeName3 = "store3";
 
             String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
             String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
+            String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(jobId, storeName3);
 
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, baseDir, restoreConsumer, true); // standby
 
             restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
                     new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
@@ -360,9 +364,14 @@ public class ProcessorStateManagerTest {
             restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList(
                     new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])
             ));
+            restoreConsumer.updatePartitions(storeTopicName3, Utils.mkList(
+                    new PartitionInfo(storeTopicName3, 0, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo(storeTopicName3, 1, Node.noNode(), new Node[0], new Node[0])
+            ));
 
             TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
             TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
+            TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
 
             Map<TopicPartition, Long> endOffsets = new HashMap<>();
             endOffsets.put(partition1, 13L);
@@ -371,20 +380,27 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
             MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
+            MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
 
+            // if there is an source partition, inherit the partition id
+            Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
             try {
                 restoreConsumer.reset();
 
                 stateMgr.register(store1, true, store1.stateRestoreCallback);
                 stateMgr.register(store2, true, store2.stateRestoreCallback);
+                stateMgr.register(store3, true, store3.stateRestoreCallback);
 
                 Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
 
-                assertEquals(2, changeLogOffsets.size());
+                assertEquals(3, changeLogOffsets.size());
                 assertTrue(changeLogOffsets.containsKey(partition1));
                 assertTrue(changeLogOffsets.containsKey(partition2));
+                assertTrue(changeLogOffsets.containsKey(partition3));
                 assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1));
                 assertEquals(-1L, (long) changeLogOffsets.get(partition2));
+                assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
             } finally {
                 stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -400,7 +416,6 @@ public class ProcessorStateManagerTest {
         File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
 
             restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
                     new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
@@ -408,6 +423,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
             try {
                 stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -432,7 +448,6 @@ public class ProcessorStateManagerTest {
             oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
 
             restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
                     new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
@@ -450,6 +465,7 @@ public class ProcessorStateManagerTest {
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
             try {
                 // make sure the checkpoint file is deleted
                 assertFalse(checkpointFile.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 00b983d..b2f45fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -39,6 +40,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +48,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class StandbyTaskTest {
 
@@ -72,6 +75,15 @@ public class StandbyTaskTest {
             )
     );
 
+    private final TopicPartition ktable = new TopicPartition("ktable1", 0);
+    private final Set<TopicPartition> ktablePartitions = Utils.mkSet(ktable);
+    private final ProcessorTopology ktableTopology = new ProcessorTopology(
+            Collections.<ProcessorNode>emptyList(),
+            Collections.<String, SourceNode>emptyMap(),
+            Utils.<StateStoreSupplier>mkList(
+                    new MockStateStoreSupplier(ktable.topic(), true, false)
+            )
+    );
 
     private StreamingConfig createConfig(final File baseDir) throws Exception {
         return new StreamingConfig(new Properties() {
@@ -116,8 +128,6 @@ public class StandbyTaskTest {
 
     @Test
     public void testStorePartitions() throws Exception {
-        System.out.println("STARTED");
-
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
@@ -201,6 +211,104 @@ public class StandbyTaskTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testUpdateKTable() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            consumer.assign(Utils.mkList(ktable));
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L));
+            consumer.commitSync(committedOffsets);
+
+            restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
+                    new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
+            ));
+
+            StreamingConfig config = createConfig(baseDir);
+            StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
+
+            restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+            for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 1, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 2, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 3, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 4, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 5, 100))) {
+                restoreStateConsumer.bufferRecord(record);
+            }
+
+            for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
+                TopicPartition partition = entry.getKey();
+                long offset = entry.getValue();
+                if (offset >= 0) {
+                    restoreStateConsumer.seek(partition, offset);
+                } else {
+                    restoreStateConsumer.seekToBeginning(partition);
+                }
+            }
+
+            // The commit offset is at 0L. Records should not be processed
+            List<ConsumerRecord<byte[], byte[]>> remaining = task.update(ktable, restoreStateConsumer.poll(100).records(ktable));
+            assertEquals(5, remaining.size());
+
+            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(10L));
+            consumer.commitSync(committedOffsets);
+            task.commit(); // update offset limits
+
+            // The commit offset has not reached, yet.
+            remaining = task.update(ktable, remaining);
+            assertEquals(5, remaining.size());
+
+            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(11L));
+            consumer.commitSync(committedOffsets);
+            task.commit(); // update offset limits
+
+            // one record should be processed.
+            remaining = task.update(ktable, remaining);
+            assertEquals(4, remaining.size());
+
+            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(45L));
+            consumer.commitSync(committedOffsets);
+            task.commit(); // update offset limits
+
+            // The commit offset is now 45. All record except for the last one should be processed.
+            remaining = task.update(ktable, remaining);
+            assertEquals(1, remaining.size());
+
+            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(50L));
+            consumer.commitSync(committedOffsets);
+            task.commit(); // update offset limits
+
+            // The commit offset is now 50. Still the last record remains.
+            remaining = task.update(ktable, remaining);
+            assertEquals(1, remaining.size());
+
+            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(60L));
+            consumer.commitSync(committedOffsets);
+            task.commit(); // update offset limits
+
+            // The commit offset is now 60. No record should be left.
+            remaining = task.update(ktable, remaining);
+            assertNull(remaining);
+
+            task.close();
+
+            File taskDir = new File(baseDir, taskId.toString());
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+            Map<TopicPartition, Long> offsets = checkpoint.read();
+
+            assertEquals(1, offsets.size());
+            assertEquals(new Long(51L), offsets.get(ktable));
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 3b17afe..6810841 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -29,10 +29,16 @@ import java.util.ArrayList;
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private final String name;
     private final boolean persistent;
+    private final boolean loggingEnabled;
 
     public MockStateStoreSupplier(String name, boolean persistent) {
+        this(name, persistent, true);
+    }
+
+    public MockStateStoreSupplier(String name, boolean persistent, boolean loggingEnabled) {
         this.name = name;
         this.persistent = persistent;
+        this.loggingEnabled = loggingEnabled;
     }
 
     @Override
@@ -42,13 +48,18 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
 
     @Override
     public StateStore get() {
-        return new MockStateStore(name, persistent);
+        if (loggingEnabled) {
+            return new MockStateStore(name, persistent);
+        } else {
+            return new MockStateStore(name, persistent).disableLogging();
+        }
     }
 
     public static class MockStateStore implements StateStore {
         private final String name;
         private final boolean persistent;
 
+        public boolean loggingEnabled = true;
         public boolean initialized = false;
         public boolean flushed = false;
         public boolean closed = false;
@@ -59,6 +70,11 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
             this.persistent = persistent;
         }
 
+        public MockStateStore disableLogging() {
+            loggingEnabled = false;
+            return this;
+        }
+
         @Override
         public String name() {
             return name;
@@ -66,7 +82,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
 
         @Override
         public void init(ProcessorContext context) {
-            context.register(this, true, stateRestoreCallback);
+            context.register(this, loggingEnabled, stateRestoreCallback);
             initialized = true;
         }