You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/24 05:10:55 UTC
[3/4] kafka git commit: KAFKA-4379: Remove caching of dirty and
removed keys from StoreChangeLogger
KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger
The `StoreChangeLogger` currently keeps a cache of dirty and removed keys and will batch the changelog records such that we don't send a record for each update. However, with KIP-63 this is unnecessary as the batching and de-duping is done by the caching layer. Further, the `StoreChangeLogger` relies on `context.timestamp()` which is likely to be incorrect when caching is enabled
Author: Damian Guy <da...@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes #2103 from dguy/store-change-logger
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d3003b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d3003b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d3003b3
Branch: refs/heads/0.10.1
Commit: 9d3003b32dfc744c478277123c8878fa65dd9ec7
Parents: c000eb2
Author: Damian Guy <da...@gmail.com>
Authored: Fri Nov 11 10:21:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 08:55:48 2016 -0800
----------------------------------------------------------------------
.../state/internals/CachingKeyValueStore.java | 21 +++--
.../internals/InMemoryKeyValueLoggedStore.java | 23 ++---
.../streams/state/internals/RocksDBStore.java | 24 ++----
.../state/internals/RocksDBWindowStore.java | 26 ++----
.../state/internals/StoreChangeLogger.java | 89 +++-----------------
.../internals/RocksDBKeyValueStoreTest.java | 8 +-
.../state/internals/StoreChangeLoggerTest.java | 53 +++---------
.../apache/kafka/test/KStreamTestDriver.java | 5 +-
8 files changed, 61 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 81ff5b5..ab050b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
-import java.util.ArrayList;
import java.util.List;
class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStore<K, V> {
@@ -78,27 +77,27 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> entries) {
- final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
for (ThreadCache.DirtyEntry entry : entries) {
- keyValues.add(KeyValue.pair(entry.key(), entry.newValue()));
- maybeForward(entry, (InternalProcessorContext) context);
+ putAndMaybeForward(entry, (InternalProcessorContext) context);
}
- underlying.putAll(keyValues);
}
});
}
- private void maybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
- if (flushListener != null) {
- final RecordContext current = context.recordContext();
+ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
+ final RecordContext current = context.recordContext();
+ try {
context.setRecordContext(entry.recordContext());
- try {
+ if (flushListener != null) {
+
flushListener.apply(serdes.keyFrom(entry.key().get()),
serdes.valueFrom(entry.newValue()), serdes.valueFrom(underlying.get(entry.key())));
- } finally {
- context.setRecordContext(current);
+
}
+ underlying.put(entry.key(), entry.newValue());
+ } finally {
+ context.setRecordContext(current);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 4f056ec..d81f6fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -35,7 +35,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
private final String storeName;
private StoreChangeLogger<K, V> changeLogger;
- private StoreChangeLogger.ValueGetter<K, V> getter;
+ private ProcessorContext context;
public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
this.storeName = storeName;
@@ -52,6 +52,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context, StateStore root) {
+ this.context = context;
inner.init(context, root);
// construct the serde
@@ -61,12 +62,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
- this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
- @Override
- public V get(K key) {
- return inner.get(key);
- }
- };
// if the inner store is an LRU cache, add the eviction listener to log removed record
if (inner instanceof MemoryLRUCache) {
@@ -98,16 +93,14 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
public void put(K key, V value) {
this.inner.put(key, value);
- changeLogger.add(key);
- changeLogger.maybeLogChange(this.getter);
+ changeLogger.logChange(key, value);
}
@Override
public V putIfAbsent(K key, V value) {
V originalValue = this.inner.putIfAbsent(key, value);
if (originalValue == null) {
- changeLogger.add(key);
- changeLogger.maybeLogChange(this.getter);
+ changeLogger.logChange(key, value);
}
return originalValue;
}
@@ -118,9 +111,8 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
for (KeyValue<K, V> entry : entries) {
K key = entry.key;
- changeLogger.add(key);
+ changeLogger.logChange(key, entry.value);
}
- changeLogger.maybeLogChange(this.getter);
}
@Override
@@ -139,8 +131,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
* @param key the key for the entry that the inner store removed
*/
protected void removed(K key) {
- changeLogger.delete(key);
- changeLogger.maybeLogChange(this.getter);
+ changeLogger.logChange(key, null);
}
@Override
@@ -166,7 +157,5 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
@Override
public void flush() {
this.inner.flush();
-
- changeLogger.logChange(getter);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e27ffd8..41d633e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -98,9 +98,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private boolean loggingEnabled = false;
private StoreChangeLogger<Bytes, byte[]> changeLogger;
- private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
protected volatile boolean open = false;
+ private ProcessorContext context;
public KeyValueStore<K, V> enableLogging() {
loggingEnabled = true;
@@ -142,6 +142,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@SuppressWarnings("unchecked")
public void openDB(ProcessorContext context) {
+ this.context = context;
final Map<String, Object> configs = context.appConfigs();
final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
if (configSetterClass != null) {
@@ -165,13 +166,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
- this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
- @Override
- public byte[] get(Bytes key) {
- return getInternal(key.get());
- }
- };
-
context.register(root, loggingEnabled, new StateRestoreCallback() {
@Override
@@ -247,8 +241,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
putInternal(rawKey, rawValue);
if (loggingEnabled) {
- changeLogger.add(Bytes.wrap(rawKey));
- changeLogger.maybeLogChange(this.getter);
+ changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
}
}
@@ -292,16 +285,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
if (entry.value == null) {
db.remove(rawKey);
} else {
- batch.put(rawKey, serdes.rawValue(entry.value));
+ final byte[] value = serdes.rawValue(entry.value);
+ batch.put(rawKey, value);
if (loggingEnabled) {
- changeLogger.add(Bytes.wrap(rawKey));
+ changeLogger.logChange(Bytes.wrap(rawKey), value);
}
}
}
db.write(wOptions, batch);
- if (loggingEnabled) {
- changeLogger.maybeLogChange(getter);
- }
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
}
@@ -371,9 +362,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
if (db == null) {
return;
}
- if (loggingEnabled) {
- changeLogger.logChange(getter);
- }
// flush RocksDB
flushInternal();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index dd24320..b563137 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -142,7 +142,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final SimpleDateFormat formatter;
- private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
private ProcessorContext context;
@@ -166,11 +165,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
this.retainDuplicates = retainDuplicates;
- this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
- public byte[] get(Bytes key) {
- return getInternal(key.get());
- }
- };
// Create a date formatter. Formatted timestamps are used as segment name suffixes
this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
@@ -262,9 +256,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
segment.flush();
}
}
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
}
@Override
@@ -279,25 +270,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@Override
public void put(K key, V value) {
- byte[] rawKey = putAndReturnInternalKey(key, value, context.timestamp());
-
- if (rawKey != null && loggingEnabled) {
- changeLogger.add(Bytes.wrap(rawKey));
- changeLogger.maybeLogChange(this.getter);
- }
+ put(key, value, context.timestamp());
}
@Override
public void put(K key, V value, long timestamp) {
- byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
+ final byte[] rawValue = serdes.rawValue(value);
+ byte[] rawKey = putAndReturnInternalKey(key, rawValue, timestamp);
if (rawKey != null && loggingEnabled) {
- changeLogger.add(Bytes.wrap(rawKey));
- changeLogger.maybeLogChange(this.getter);
+ changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
}
}
- private byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
+ private byte[] putAndReturnInternalKey(K key, byte[] value, long timestamp) {
long segmentId = segmentId(timestamp);
if (segmentId > currentSegmentId) {
@@ -312,7 +298,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (retainDuplicates)
seqnum = (seqnum + 1) & 0x7FFFFFFF;
byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
- segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value));
+ segment.put(Bytes.wrap(binaryKey), value);
return binaryKey;
} else {
return null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 41f9ae2..7aaddf8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -24,12 +24,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;
-import java.util.HashSet;
-import java.util.Set;
-
/**
- * Store change log collector that batches updates before sending to Kafka.
- *
* Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
* If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
@@ -37,97 +32,33 @@ import java.util.Set;
* @param <K>
* @param <V>
*/
-public class StoreChangeLogger<K, V> {
-
- public interface ValueGetter<K, V> {
- V get(K key);
- }
-
- // TODO: these values should be configurable
- protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
+class StoreChangeLogger<K, V> {
protected final StateSerdes<K, V> serialization;
private final String topic;
private final int partition;
private final ProcessorContext context;
- private final int maxDirty;
- private final int maxRemoved;
-
- protected Set<K> dirty;
- protected Set<K> removed;
+ private final RecordCollector collector;
- public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
- this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
- }
- public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
- this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
- init();
+ StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
+ this(storeName, context, context.taskId().partition, serialization);
}
- protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
+ private StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization) {
this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
this.context = context;
this.partition = partition;
this.serialization = serialization;
- this.maxDirty = maxDirty;
- this.maxRemoved = maxRemoved;
- }
-
- public void init() {
- this.dirty = new HashSet<>();
- this.removed = new HashSet<>();
+ this.collector = ((RecordCollector.Supplier) context).recordCollector();
}
- public void add(K key) {
- this.dirty.add(key);
- this.removed.remove(key);
- }
-
- public void delete(K key) {
- this.dirty.remove(key);
- this.removed.add(key);
- }
-
- public void maybeLogChange(ValueGetter<K, V> getter) {
- if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
- logChange(getter);
- }
-
- public void logChange(ValueGetter<K, V> getter) {
- if (this.removed.isEmpty() && this.dirty.isEmpty())
- return;
-
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+ void logChange(final K key, final V value) {
if (collector != null) {
- Serializer<K> keySerializer = serialization.keySerializer();
- Serializer<V> valueSerializer = serialization.valueSerializer();
-
- for (K k : this.removed) {
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
- }
- for (K k : this.dirty) {
- V v = getter.get(k);
- collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), k, v), keySerializer, valueSerializer);
- }
- this.removed.clear();
- this.dirty.clear();
+ final Serializer<K> keySerializer = serialization.keySerializer();
+ final Serializer<V> valueSerializer = serialization.valueSerializer();
+ collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), key, value), keySerializer, valueSerializer);
}
}
-
- public void clear() {
- this.removed.clear();
- this.dirty.clear();
- }
-
- // this is for test only
- public int numDirty() {
- return this.dirty.size();
- }
-
- // this is for test only
- public int numRemoved() {
- return this.removed.size();
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 25e0620..c3190b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -97,7 +97,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@Test
public void shouldPerformRangeQueriesWithCachingDisabled() throws Exception {
final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false);
+ final MockProcessorContext context = (MockProcessorContext) driver.context();
+ final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false);
+ context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
final KeyValueIterator<Integer, String> range = store.range(1, 2);
@@ -109,7 +111,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@Test
public void shouldPerformAllQueriesWithCachingDisabled() throws Exception {
final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false);
+ final MockProcessorContext context = (MockProcessorContext) driver.context();
+ final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false);
+ context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
final KeyValueIterator<Integer, String> range = store.all();
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7675f9b..fbfffb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -32,13 +32,13 @@ import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class StoreChangeLoggerTest {
private final String topic = "topic";
private final Map<Integer, String> logged = new HashMap<>();
- private final Map<Integer, String> written = new HashMap<>();
private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
new RecordCollector(null, "StoreChangeLoggerTest") {
@@ -57,49 +57,22 @@ public class StoreChangeLoggerTest {
}
);
- private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
+ private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
- private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
- @Override
- public String get(Integer key) {
- return written.get(key);
- }
- };
@Test
- public void testAddRemove() {
+ public void testAddRemove() throws Exception {
context.setTime(1);
- written.put(0, "zero");
- changeLogger.add(0);
- written.put(1, "one");
- changeLogger.add(1);
- written.put(2, "two");
- changeLogger.add(2);
- assertEquals(3, changeLogger.numDirty());
- assertEquals(0, changeLogger.numRemoved());
-
- changeLogger.delete(0);
- changeLogger.delete(1);
- written.put(3, "three");
- changeLogger.add(3);
- assertEquals(2, changeLogger.numDirty());
- assertEquals(2, changeLogger.numRemoved());
-
- written.put(0, "zero-again");
- changeLogger.add(0);
- assertEquals(3, changeLogger.numDirty());
- assertEquals(1, changeLogger.numRemoved());
-
- written.put(4, "four");
- changeLogger.add(4);
- changeLogger.maybeLogChange(getter);
- assertEquals(0, changeLogger.numDirty());
- assertEquals(0, changeLogger.numRemoved());
- assertEquals(5, logged.size());
- assertEquals("zero-again", logged.get(0));
- assertEquals(null, logged.get(1));
+ changeLogger.logChange(0, "zero");
+ changeLogger.logChange(1, "one");
+ changeLogger.logChange(2, "two");
+
+ assertEquals("zero", logged.get(0));
+ assertEquals("one", logged.get(1));
assertEquals("two", logged.get(2));
- assertEquals("three", logged.get(3));
- assertEquals("four", logged.get(4));
+
+ changeLogger.logChange(0, null);
+ assertNull(logged.get(0));
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 05abbc6..14e15a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -100,12 +100,15 @@ public class KStreamTestDriver {
}
public void process(String topicName, Object key, Object value) {
+ final ProcessorNode previous = currNode;
currNode = topology.source(topicName);
// if currNode is null, check if this topic is a changelog topic;
// if yes, skip
- if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))
+ if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
+ currNode = previous;
return;
+ }
context.setRecordContext(createRecordContext(context.timestamp()));
context.setCurrentNode(currNode);
try {