You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/08 08:16:23 UTC
kafka git commit: HOTFIX: fix ProcessorStateManager to use correct
ktable partitions
Repository: kafka
Updated Branches:
refs/heads/trunk b390b15cd -> 268392f5e
HOTFIX: fix ProcessorStateManager to use correct ktable partitions
guozhangwang
* fix ProcessorStateManager to use correct ktable partitions
* more ktable tests
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #635 from ymatsuda/more_ktable_test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/268392f5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/268392f5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/268392f5
Branch: refs/heads/trunk
Commit: 268392f5e99dc3eab9d89e921bafde782a1d10a6
Parents: b390b15
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Dec 7 23:16:19 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Dec 7 23:16:19 2015 -0800
----------------------------------------------------------------------
.../processor/internals/AbstractTask.java | 2 +-
.../internals/ProcessorStateManager.java | 36 +++++-
.../internals/ProcessorStateManagerTest.java | 34 ++++--
.../processor/internals/StandbyTaskTest.java | 112 ++++++++++++++++++-
.../kafka/test/MockStateStoreSupplier.java | 20 +++-
5 files changed, 184 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b3255bb..569f4e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -59,7 +59,7 @@ public abstract class AbstractTask {
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
- this.stateMgr = new ProcessorStateManager(jobId, id.partition, stateFile, restoreConsumer, isStandby);
+ this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 579d245..3cac3f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -35,10 +35,13 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ProcessorStateManager {
@@ -49,10 +52,12 @@ public class ProcessorStateManager {
public static final String LOCK_FILE_NAME = ".lock";
private final String jobId;
- private final int partition;
+ private final int defaultPartition;
+ private final Map<String, TopicPartition> partitionForTopic;
private final File baseDir;
private final FileLock directoryLock;
private final Map<String, StateStore> stores;
+ private final Set<String> loggingEnabled;
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
@@ -60,11 +65,16 @@ public class ProcessorStateManager {
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
- public ProcessorStateManager(String jobId, int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+ public ProcessorStateManager(String jobId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
this.jobId = jobId;
- this.partition = partition;
+ this.defaultPartition = defaultPartition;
+ this.partitionForTopic = new HashMap<>();
+ for (TopicPartition source : sources) {
+ this.partitionForTopic.put(source.topic(), source);
+ }
this.baseDir = baseDir;
this.stores = new HashMap<>();
+ this.loggingEnabled = new HashSet<>();
this.restoreConsumer = restoreConsumer;
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
@@ -119,6 +129,9 @@ public class ProcessorStateManager {
if (this.stores.containsKey(store.name()))
throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
+ if (loggingEnabled)
+ this.loggingEnabled.add(store.name());
+
// check that the underlying change log topic exist or not
String topic;
if (loggingEnabled)
@@ -126,6 +139,7 @@ public class ProcessorStateManager {
else topic = store.name();
// block until the partition is ready for this state changelog topic or time has elapsed
+ int partition = getPartition(topic);
boolean partitionNotFound = true;
long startTime = System.currentTimeMillis();
long waitTime = 5000L; // hard-code the value since we should not block after KIP-4
@@ -146,7 +160,7 @@ public class ProcessorStateManager {
} while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
if (partitionNotFound)
- throw new KafkaException("Store " + store.name() + "'s change log does not contain partition " + partition);
+ throw new KafkaException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
this.stores.put(store.name(), store);
@@ -165,7 +179,7 @@ public class ProcessorStateManager {
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
}
- TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), partition);
+ TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), getPartition(store.name()));
restoreConsumer.assign(Collections.singletonList(storePartition));
try {
@@ -212,6 +226,7 @@ public class ProcessorStateManager {
for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
String topicName = entry.getKey();
+ int partition = getPartition(topicName);
TopicPartition storePartition = new TopicPartition(topicName, partition);
if (checkpointedOffsets.containsKey(storePartition)) {
@@ -293,7 +308,11 @@ public class ProcessorStateManager {
Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
for (String storeName : stores.keySet()) {
- TopicPartition part = new TopicPartition(storeChangelogTopic(jobId, storeName), partition);
+ TopicPartition part;
+ if (loggingEnabled.contains(storeName))
+ part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
+ else
+ part = new TopicPartition(storeName, getPartition(storeName));
// only checkpoint the offset to the offsets file if it is persistent;
if (stores.get(storeName).persistent()) {
@@ -323,4 +342,9 @@ public class ProcessorStateManager {
directoryLock.release();
}
+ private int getPartition(String topic) {
+ TopicPartition partition = partitionForTopic.get(topic);
+
+ return partition == null ? defaultPartition : partition.partition();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 7e5ce49..5e336cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -41,6 +41,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -179,6 +180,7 @@ public class ProcessorStateManagerTest {
}
}
+ private final Set<TopicPartition> noPartitions = Collections.emptySet();
private final String jobId = "test-job";
private final String stateDir = "test";
private final String persistentStoreName = "persistentStore";
@@ -193,7 +195,7 @@ public class ProcessorStateManagerTest {
FileLock lock;
// the state manager locks the directory
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
try {
// this should not get the lock
@@ -222,7 +224,7 @@ public class ProcessorStateManagerTest {
try {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
try {
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
} finally {
@@ -243,7 +245,6 @@ public class ProcessorStateManagerTest {
checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
@@ -253,7 +254,9 @@ public class ProcessorStateManagerTest {
TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
- MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); // persistent store
+ MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
+
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
@@ -292,7 +295,6 @@ public class ProcessorStateManagerTest {
long lastCheckpointedOffset = 10L;
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
@@ -307,6 +309,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
@@ -344,15 +347,16 @@ public class ProcessorStateManagerTest {
long lastCheckpointedOffset = 10L;
String storeName1 = "store1";
String storeName2 = "store2";
+ String storeName3 = "store3";
String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
+ String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(jobId, storeName3);
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, baseDir, restoreConsumer, true); // standby
restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
@@ -360,9 +364,14 @@ public class ProcessorStateManagerTest {
restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList(
new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])
));
+ restoreConsumer.updatePartitions(storeTopicName3, Utils.mkList(
+ new PartitionInfo(storeTopicName3, 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo(storeTopicName3, 1, Node.noNode(), new Node[0], new Node[0])
+ ));
TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
+ TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(partition1, 13L);
@@ -371,20 +380,27 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
+ MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
+ // if there is an source partition, inherit the partition id
+ Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
try {
restoreConsumer.reset();
stateMgr.register(store1, true, store1.stateRestoreCallback);
stateMgr.register(store2, true, store2.stateRestoreCallback);
+ stateMgr.register(store3, true, store3.stateRestoreCallback);
Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
- assertEquals(2, changeLogOffsets.size());
+ assertEquals(3, changeLogOffsets.size());
assertTrue(changeLogOffsets.containsKey(partition1));
assertTrue(changeLogOffsets.containsKey(partition2));
+ assertTrue(changeLogOffsets.containsKey(partition3));
assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1));
assertEquals(-1L, (long) changeLogOffsets.get(partition2));
+ assertEquals(-1L, (long) changeLogOffsets.get(partition3));
} finally {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -400,7 +416,6 @@ public class ProcessorStateManagerTest {
File baseDir = Files.createTempDirectory(stateDir).toFile();
try {
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
@@ -408,6 +423,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
try {
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
@@ -432,7 +448,6 @@ public class ProcessorStateManagerTest {
oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
@@ -450,6 +465,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
try {
// make sure the checkpoint file is deleted
assertFalse(checkpointFile.exists());
http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 00b983d..b2f45fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -39,6 +40,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -46,6 +48,7 @@ import java.util.Properties;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class StandbyTaskTest {
@@ -72,6 +75,15 @@ public class StandbyTaskTest {
)
);
+ private final TopicPartition ktable = new TopicPartition("ktable1", 0);
+ private final Set<TopicPartition> ktablePartitions = Utils.mkSet(ktable);
+ private final ProcessorTopology ktableTopology = new ProcessorTopology(
+ Collections.<ProcessorNode>emptyList(),
+ Collections.<String, SourceNode>emptyMap(),
+ Utils.<StateStoreSupplier>mkList(
+ new MockStateStoreSupplier(ktable.topic(), true, false)
+ )
+ );
private StreamingConfig createConfig(final File baseDir) throws Exception {
return new StreamingConfig(new Properties() {
@@ -116,8 +128,6 @@ public class StandbyTaskTest {
@Test
public void testStorePartitions() throws Exception {
- System.out.println("STARTED");
-
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
@@ -201,6 +211,104 @@ public class StandbyTaskTest {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testUpdateKTable() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ consumer.assign(Utils.mkList(ktable));
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+ committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L));
+ consumer.commitSync(committedOffsets);
+
+ restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
+ new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
+ ));
+
+ StreamingConfig config = createConfig(baseDir);
+ StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
+
+ restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+ for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 1, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 2, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 3, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 4, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 5, 100))) {
+ restoreStateConsumer.bufferRecord(record);
+ }
+
+ for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
+ TopicPartition partition = entry.getKey();
+ long offset = entry.getValue();
+ if (offset >= 0) {
+ restoreStateConsumer.seek(partition, offset);
+ } else {
+ restoreStateConsumer.seekToBeginning(partition);
+ }
+ }
+
+ // The commit offset is at 0L. Records should not be processed
+ List<ConsumerRecord<byte[], byte[]>> remaining = task.update(ktable, restoreStateConsumer.poll(100).records(ktable));
+ assertEquals(5, remaining.size());
+
+ committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(10L));
+ consumer.commitSync(committedOffsets);
+ task.commit(); // update offset limits
+
+ // The commit offset has not reached, yet.
+ remaining = task.update(ktable, remaining);
+ assertEquals(5, remaining.size());
+
+ committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(11L));
+ consumer.commitSync(committedOffsets);
+ task.commit(); // update offset limits
+
+ // one record should be processed.
+ remaining = task.update(ktable, remaining);
+ assertEquals(4, remaining.size());
+
+ committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(45L));
+ consumer.commitSync(committedOffsets);
+ task.commit(); // update offset limits
+
+ // The commit offset is now 45. All record except for the last one should be processed.
+ remaining = task.update(ktable, remaining);
+ assertEquals(1, remaining.size());
+
+ committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(50L));
+ consumer.commitSync(committedOffsets);
+ task.commit(); // update offset limits
+
+ // The commit offset is now 50. Still the last record remains.
+ remaining = task.update(ktable, remaining);
+ assertEquals(1, remaining.size());
+
+ committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(60L));
+ consumer.commitSync(committedOffsets);
+ task.commit(); // update offset limits
+
+ // The commit offset is now 60. No record should be left.
+ remaining = task.update(ktable, remaining);
+ assertNull(remaining);
+
+ task.close();
+
+ File taskDir = new File(baseDir, taskId.toString());
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+ Map<TopicPartition, Long> offsets = checkpoint.read();
+
+ assertEquals(1, offsets.size());
+ assertEquals(new Long(51L), offsets.get(ktable));
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/268392f5/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 3b17afe..6810841 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -29,10 +29,16 @@ import java.util.ArrayList;
public class MockStateStoreSupplier implements StateStoreSupplier {
private final String name;
private final boolean persistent;
+ private final boolean loggingEnabled;
public MockStateStoreSupplier(String name, boolean persistent) {
+ this(name, persistent, true);
+ }
+
+ public MockStateStoreSupplier(String name, boolean persistent, boolean loggingEnabled) {
this.name = name;
this.persistent = persistent;
+ this.loggingEnabled = loggingEnabled;
}
@Override
@@ -42,13 +48,18 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
@Override
public StateStore get() {
- return new MockStateStore(name, persistent);
+ if (loggingEnabled) {
+ return new MockStateStore(name, persistent);
+ } else {
+ return new MockStateStore(name, persistent).disableLogging();
+ }
}
public static class MockStateStore implements StateStore {
private final String name;
private final boolean persistent;
+ public boolean loggingEnabled = true;
public boolean initialized = false;
public boolean flushed = false;
public boolean closed = false;
@@ -59,6 +70,11 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
this.persistent = persistent;
}
+ public MockStateStore disableLogging() {
+ loggingEnabled = false;
+ return this;
+ }
+
@Override
public String name() {
return name;
@@ -66,7 +82,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
@Override
public void init(ProcessorContext context) {
- context.register(this, true, stateRestoreCallback);
+ context.register(this, loggingEnabled, stateRestoreCallback);
initialized = true;
}